FileOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.text.NumberFormat;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.compress.CompressionCodec;
  24. import org.apache.hadoop.util.Progressable;
  25. /** A base class for {@link OutputFormat}. */
  26. public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
  27.   /**
  28.    * Set whether the output of the job is compressed.
  29.    * @param conf the {@link JobConf} to modify
  30.    * @param compress should the output of the job be compressed?
  31.    */
  32.   public static void setCompressOutput(JobConf conf, boolean compress) {
  33.     conf.setBoolean("mapred.output.compress", compress);
  34.   }
  35.   
  36.   /**
  37.    * Is the job output compressed?
  38.    * @param conf the {@link JobConf} to look in
  39.    * @return <code>true</code> if the job output should be compressed,
  40.    *         <code>false</code> otherwise
  41.    */
  42.   public static boolean getCompressOutput(JobConf conf) {
  43.     return conf.getBoolean("mapred.output.compress", false);
  44.   }
  45.   
  46.   /**
  47.    * Set the {@link CompressionCodec} to be used to compress job outputs.
  48.    * @param conf the {@link JobConf} to modify
  49.    * @param codecClass the {@link CompressionCodec} to be used to
  50.    *                   compress the job outputs
  51.    */
  52.   public static void 
  53.   setOutputCompressorClass(JobConf conf, 
  54.                            Class<? extends CompressionCodec> codecClass) {
  55.     setCompressOutput(conf, true);
  56.     conf.setClass("mapred.output.compression.codec", codecClass, 
  57.                   CompressionCodec.class);
  58.   }
  59.   
  60.   /**
  61.    * Get the {@link CompressionCodec} for compressing the job outputs.
  62.    * @param conf the {@link JobConf} to look in
  63.    * @param defaultValue the {@link CompressionCodec} to return if not set
  64.    * @return the {@link CompressionCodec} to be used to compress the 
  65.    *         job outputs
  66.    * @throws IllegalArgumentException if the class was specified, but not found
  67.    */
  68.   public static Class<? extends CompressionCodec> 
  69.   getOutputCompressorClass(JobConf conf, 
  70.                        Class<? extends CompressionCodec> defaultValue) {
  71.     Class<? extends CompressionCodec> codecClass = defaultValue;
  72.     
  73.     String name = conf.get("mapred.output.compression.codec");
  74.     if (name != null) {
  75.       try {
  76.         codecClass = 
  77.          conf.getClassByName(name).asSubclass(CompressionCodec.class);
  78.       } catch (ClassNotFoundException e) {
  79.         throw new IllegalArgumentException("Compression codec " + name + 
  80.                                            " was not found.", e);
  81.       }
  82.     }
  83.     return codecClass;
  84.   }
  85.   
  86.   public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,
  87.                                                JobConf job, String name,
  88.                                                Progressable progress)
  89.     throws IOException;
  90.   public void checkOutputSpecs(FileSystem ignored, JobConf job) 
  91.     throws FileAlreadyExistsException, 
  92.            InvalidJobConfException, IOException {
  93.     // Ensure that the output directory is set and not already there
  94.     Path outDir = getOutputPath(job);
  95.     if (outDir == null && job.getNumReduceTasks() != 0) {
  96.       throw new InvalidJobConfException("Output directory not set in JobConf.");
  97.     }
  98.     if (outDir != null) {
  99.       FileSystem fs = outDir.getFileSystem(job);
  100.       // normalize the output directory
  101.       outDir = fs.makeQualified(outDir);
  102.       setOutputPath(job, outDir);
  103.       // check its existence
  104.       if (fs.exists(outDir)) {
  105.         throw new FileAlreadyExistsException("Output directory " + outDir + 
  106.                                              " already exists");
  107.       }
  108.     }
  109.   }
  110.   /**
  111.    * Set the {@link Path} of the output directory for the map-reduce job.
  112.    *
  113.    * @param conf The configuration of the job.
  114.    * @param outputDir the {@link Path} of the output directory for 
  115.    * the map-reduce job.
  116.    */
  117.   public static void setOutputPath(JobConf conf, Path outputDir) {
  118.     outputDir = new Path(conf.getWorkingDirectory(), outputDir);
  119.     conf.set("mapred.output.dir", outputDir.toString());
  120.   }
  121.   /**
  122.    * Set the {@link Path} of the task's temporary output directory 
  123.    * for the map-reduce job.
  124.    * 
  125.    * <p><i>Note</i>: Task output path is set by the framework.
  126.    * </p>
  127.    * @param conf The configuration of the job.
  128.    * @param outputDir the {@link Path} of the output directory 
  129.    * for the map-reduce job.
  130.    */
  131.   
  132.   static void setWorkOutputPath(JobConf conf, Path outputDir) {
  133.     outputDir = new Path(conf.getWorkingDirectory(), outputDir);
  134.     conf.set("mapred.work.output.dir", outputDir.toString());
  135.   }
  136.   
  137.   /**
  138.    * Get the {@link Path} to the output directory for the map-reduce job.
  139.    * 
  140.    * @return the {@link Path} to the output directory for the map-reduce job.
  141.    * @see FileOutputFormat#getWorkOutputPath(JobConf)
  142.    */
  143.   public static Path getOutputPath(JobConf conf) {
  144.     String name = conf.get("mapred.output.dir");
  145.     return name == null ? null: new Path(name);
  146.   }
  147.   
  148.   /**
  149.    *  Get the {@link Path} to the task's temporary output directory 
  150.    *  for the map-reduce job
  151.    *  
  152.    * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
  153.    * 
  154.    * <p><i>Note:</i> The following is valid only if the {@link OutputCommitter}
  155.    *  is {@link FileOutputCommitter}. If <code>OutputCommitter</code> is not 
  156.    *  a <code>FileOutputCommitter</code>, the task's temporary output
  157.    *  directory is same as {@link #getOutputPath(JobConf)} i.e.
  158.    *  <tt>${mapred.output.dir}$</tt></p>
  159.    *  
  160.    * <p>Some applications need to create/write-to side-files, which differ from
  161.    * the actual job-outputs.
  162.    * 
  163.    * <p>In such cases there could be issues with 2 instances of the same TIP 
  164.    * (running simultaneously e.g. speculative tasks) trying to open/write-to the
  165.    * same file (path) on HDFS. Hence the application-writer will have to pick 
  166.    * unique names per task-attempt (e.g. using the attemptid, say 
  167.    * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
  168.    * 
  169.    * <p>To get around this the Map-Reduce framework helps the application-writer 
  170.    * out by maintaining a special 
  171.    * <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> 
  172.    * sub-directory for each task-attempt on HDFS where the output of the 
  173.    * task-attempt goes. On successful completion of the task-attempt the files 
  174.    * in the <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> (only) 
  175.    * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the 
  176.    * framework discards the sub-directory of unsuccessful task-attempts. This 
  177.    * is completely transparent to the application.</p>
  178.    * 
  179.    * <p>The application-writer can take advantage of this by creating any 
  180.    * side-files required in <tt>${mapred.work.output.dir}</tt> during execution 
  181.    * of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the 
  182.    * framework will move them out similarly - thus she doesn't have to pick 
  183.    * unique paths per task-attempt.</p>
  184.    * 
  185.    * <p><i>Note</i>: the value of <tt>${mapred.work.output.dir}</tt> during 
  186.    * execution of a particular task-attempt is actually 
  187.    * <tt>${mapred.output.dir}/_temporary/_{$taskid}</tt>, and this value is 
  188.    * set by the map-reduce framework. So, just create any side-files in the 
  189.    * path  returned by {@link #getWorkOutputPath(JobConf)} from map/reduce 
  190.    * task to take advantage of this feature.</p>
  191.    * 
  192.    * <p>The entire discussion holds true for maps of jobs with 
  193.    * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 
  194.    * goes directly to HDFS.</p> 
  195.    * 
  196.    * @return the {@link Path} to the task's temporary output directory 
  197.    * for the map-reduce job.
  198.    */
  199.   public static Path getWorkOutputPath(JobConf conf) {
  200.     String name = conf.get("mapred.work.output.dir");
  201.     return name == null ? null: new Path(name);
  202.   }
  203.   /**
  204.    * Helper function to create the task's temporary output directory and 
  205.    * return the path to the task's output file.
  206.    * 
  207.    * @param conf job-configuration
  208.    * @param name temporary task-output filename
  209.    * @return path to the task's temporary output file
  210.    * @throws IOException
  211.    */
  212.   public static Path getTaskOutputPath(JobConf conf, String name) 
  213.   throws IOException {
  214.     // ${mapred.out.dir}
  215.     Path outputPath = getOutputPath(conf);
  216.     if (outputPath == null) {
  217.       throw new IOException("Undefined job output-path");
  218.     }
  219.     OutputCommitter committer = conf.getOutputCommitter();
  220.     Path workPath = outputPath;
  221.     TaskAttemptContext context = new TaskAttemptContext(conf,
  222.                 TaskAttemptID.forName(conf.get("mapred.task.id")));
  223.     if (committer instanceof FileOutputCommitter) {
  224.       workPath = ((FileOutputCommitter)committer).getWorkPath(context,
  225.                                                               outputPath);
  226.     }
  227.     
  228.     // ${mapred.out.dir}/_temporary/_${taskid}/${name}
  229.     return new Path(workPath, name);
  230.   } 
  231.   /**
  232.    * Helper function to generate a name that is unique for the task.
  233.    *
  234.    * <p>The generated name can be used to create custom files from within the
  235.    * different tasks for the job, the names for different tasks will not collide
  236.    * with each other.</p>
  237.    *
  238.    * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
  239.    * reduces and the task partition number. For example, give a name 'test'
  240.    * running on the first map o the job the generated name will be
  241.    * 'test-m-00000'.</p>
  242.    *
  243.    * @param conf the configuration for the job.
  244.    * @param name the name to make unique.
  245.    * @return a unique name accross all tasks of the job.
  246.    */
  247.   public static String getUniqueName(JobConf conf, String name) {
  248.     int partition = conf.getInt("mapred.task.partition", -1);
  249.     if (partition == -1) {
  250.       throw new IllegalArgumentException(
  251.         "This method can only be called from within a Job");
  252.     }
  253.     String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" : "r";
  254.     NumberFormat numberFormat = NumberFormat.getInstance();
  255.     numberFormat.setMinimumIntegerDigits(5);
  256.     numberFormat.setGroupingUsed(false);
  257.     return name + "-" + taskType + "-" + numberFormat.format(partition);
  258.   }
  259.   /**
  260.    * Helper function to generate a {@link Path} for a file that is unique for
  261.    * the task within the job output directory.
  262.    *
  263.    * <p>The path can be used to create custom files from within the map and
  264.    * reduce tasks. The path name will be unique for each task. The path parent
  265.    * will be the job output directory.</p>ls
  266.    *
  267.    * <p>This method uses the {@link #getUniqueName} method to make the file name
  268.    * unique for the task.</p>
  269.    *
  270.    * @param conf the configuration for the job.
  271.    * @param name the name for the file.
  272.    * @return a unique path accross all tasks of the job.
  273.    */
  274.   public static Path getPathForCustomFile(JobConf conf, String name) {
  275.     return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
  276.   }
  277. }