MultipleOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import java.util.TreeMap;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.mapred.JobConf;
  25. import org.apache.hadoop.mapred.FileOutputFormat;
  26. import org.apache.hadoop.mapred.RecordWriter;
  27. import org.apache.hadoop.mapred.Reporter;
  28. import org.apache.hadoop.util.Progressable;
  29. /**
  30.  * This abstract class extends the FileOutputFormat, allowing to write the
  31.  * output data to different output files. There are three basic use cases for
  32.  * this class.
  33.  * 
  34.  * Case one: This class is used for a map reduce job with at least one reducer.
  35.  * The reducer wants to write data to different files depending on the actual
  36.  * keys. It is assumed that a key (or value) encodes the actual key (value)
  37.  * and the desired location for the actual key (value).
  38.  * 
  39.  * Case two: This class is used for a map only job. The job wants to use an
  40.  * output file name that is either a part of the input file name of the input
  41.  * data, or some derivation of it.
  42.  * 
  43.  * Case three: This class is used for a map only job. The job wants to use an
  44.  * output file name that depends on both the keys and the input file name,
  45.  * 
  46.  */
  47. public abstract class MultipleOutputFormat<K, V>
  48. extends FileOutputFormat<K, V> {
  49.   /**
  50.    * Create a composite record writer that can write key/value data to different
  51.    * output files
  52.    * 
  53.    * @param fs
  54.    *          the file system to use
  55.    * @param job
  56.    *          the job conf for the job
  57.    * @param name
  58.    *          the leaf file name for the output file (such as part-00000")
  59.    * @param arg3
  60.    *          a progressable for reporting progress.
  61.    * @return a composite record writer
  62.    * @throws IOException
  63.    */
  64.   public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
  65.       String name, Progressable arg3) throws IOException {
  66.     final FileSystem myFS = fs;
  67.     final String myName = generateLeafFileName(name);
  68.     final JobConf myJob = job;
  69.     final Progressable myProgressable = arg3;
  70.     return new RecordWriter<K, V>() {
  71.       // a cache storing the record writers for different output files.
  72.       TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
  73.       public void write(K key, V value) throws IOException {
  74.         // get the file name based on the key
  75.         String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
  76.         // get the file name based on the input file name
  77.         String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
  78.         // get the actual key
  79.         K actualKey = generateActualKey(key, value);
  80.         V actualValue = generateActualValue(key, value);
  81.         RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
  82.         if (rw == null) {
  83.           // if we don't have the record writer yet for the final path, create
  84.           // one
  85.           // and add it to the cache
  86.           rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
  87.           this.recordWriters.put(finalPath, rw);
  88.         }
  89.         rw.write(actualKey, actualValue);
  90.       };
  91.       public void close(Reporter reporter) throws IOException {
  92.         Iterator<String> keys = this.recordWriters.keySet().iterator();
  93.         while (keys.hasNext()) {
  94.           RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
  95.           rw.close(reporter);
  96.         }
  97.         this.recordWriters.clear();
  98.       };
  99.     };
  100.   }
  101.   /**
  102.    * Generate the leaf name for the output file name. The default behavior does
  103.    * not change the leaf file name (such as part-00000)
  104.    * 
  105.    * @param name
  106.    *          the leaf file name for the output file
  107.    * @return the given leaf file name
  108.    */
  109.   protected String generateLeafFileName(String name) {
  110.     return name;
  111.   }
  112.   /**
  113.    * Generate the file output file name based on the given key and the leaf file
  114.    * name. The default behavior is that the file name does not depend on the
  115.    * key.
  116.    * 
  117.    * @param key
  118.    *          the key of the output data
  119.    * @param name
  120.    *          the leaf file name
  121.    * @return generated file name
  122.    */
  123.   protected String generateFileNameForKeyValue(K key, V value, String name) {
  124.     return name;
  125.   }
  126.   /**
  127.    * Generate the actual key from the given key/value. The default behavior is that
  128.    * the actual key is equal to the given key
  129.    * 
  130.    * @param key
  131.    *          the key of the output data
  132.    * @param value
  133.    *          the value of the output data
  134.    * @return the actual key derived from the given key/value
  135.    */
  136.   protected K generateActualKey(K key, V value) {
  137.     return key;
  138.   }
  139.   
  140.   /**
  141.    * Generate the actual value from the given key and value. The default behavior is that
  142.    * the actual value is equal to the given value
  143.    * 
  144.    * @param key
  145.    *          the key of the output data
  146.    * @param value
  147.    *          the value of the output data
  148.    * @return the actual value derived from the given key/value
  149.    */
  150.   protected V generateActualValue(K key, V value) {
  151.     return value;
  152.   }
  153.   
  154.   /**
  155.    * Generate the outfile name based on a given anme and the input file name. If
  156.    * the map input file does not exists (i.e. this is not for a map only job),
  157.    * the given name is returned unchanged. If the config value for
  158.    * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
  159.    * name is returned unchanged. Otherwise, return a file name consisting of the
  160.    * N trailing legs of the input file name where N is the config value for
  161.    * "num.of.trailing.legs.to.use".
  162.    * 
  163.    * @param job
  164.    *          the job config
  165.    * @param name
  166.    *          the output file name
  167.    * @return the outfile name based on a given anme and the input file name.
  168.    */
  169.   protected String getInputFileBasedOutputFileName(JobConf job, String name) {
  170.     String infilepath = job.get("map.input.file");
  171.     if (infilepath == null) {
  172.       // if the map input file does not exists, then return the given name
  173.       return name;
  174.     }
  175.     int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
  176.     if (numOfTrailingLegsToUse <= 0) {
  177.       return name;
  178.     }
  179.     Path infile = new Path(infilepath);
  180.     Path parent = infile.getParent();
  181.     String midName = infile.getName();
  182.     Path outPath = new Path(midName);
  183.     for (int i = 1; i < numOfTrailingLegsToUse; i++) {
  184.       if (parent == null) break;
  185.       midName = parent.getName();
  186.       if (midName.length() == 0) break;
  187.       parent = parent.getParent();
  188.       outPath = new Path(midName, outPath);
  189.     }
  190.     return outPath.toString();
  191.   }
  192.   /**
  193.    * 
  194.    * @param fs
  195.    *          the file system to use
  196.    * @param job
  197.    *          a job conf object
  198.    * @param name
  199.    *          the name of the file over which a record writer object will be
  200.    *          constructed
  201.    * @param arg3
  202.    *          a progressable object
  203.    * @return A RecordWriter object over the given file
  204.    * @throws IOException
  205.    */
  206.   abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
  207.       JobConf job, String name, Progressable arg3) throws IOException;
  208. }