MultipleOutputs.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.fs.FileSystem;
  20. import org.apache.hadoop.io.Writable;
  21. import org.apache.hadoop.io.WritableComparable;
  22. import org.apache.hadoop.mapred.*;
  23. import org.apache.hadoop.util.Progressable;
  24. import java.io.IOException;
  25. import java.util.*;
  26. /**
  27.  * The MultipleOutputs class simplifies writting to additional outputs other
  28.  * than the job default output via the <code>OutputCollector</code> passed to
  29.  * the <code>map()</code> and <code>reduce()</code> methods of the
  30.  * <code>Mapper</code> and <code>Reducer</code> implementations.
  31.  * <p/>
  32.  * Each additional output, or named output, may be configured with its own
  33.  * <code>OutputFormat</code>, with its own key class and with its own value
  34.  * class.
  35.  * <p/>
  36.  * A named output can be a single file or a multi file. The later is refered as
  37.  * a multi named output.
  38.  * <p/>
  39.  * A multi named output is an unbound set of files all sharing the same
  40.  * <code>OutputFormat</code>, key class and value class configuration.
  41.  * <p/>
  42.  * When named outputs are used within a <code>Mapper</code> implementation,
  43.  * key/values written to a name output are not part of the reduce phase, only
  44.  * key/values written to the job <code>OutputCollector</code> are part of the
  45.  * reduce phase.
  46.  * <p/>
  47.  * MultipleOutputs supports counters, by default the are disabled. The counters
  48.  * group is the {@link MultipleOutputs} class name.
  49.  * </p>
  50.  * The names of the counters are the same as the named outputs. For multi
  51.  * named outputs the name of the counter is the concatenation of the named
  52.  * output, and underscore '_' and the multiname.
  53.  * <p/>
  54.  * Job configuration usage pattern is:
  55.  * <pre>
  56.  *
  57.  * JobConf conf = new JobConf();
  58.  *
  59.  * conf.setInputPath(inDir);
  60.  * FileOutputFormat.setOutputPath(conf, outDir);
  61.  *
  62.  * conf.setMapperClass(MOMap.class);
  63.  * conf.setReducerClass(MOReduce.class);
  64.  * ...
  65.  *
  66.  * // Defines additional single text based output 'text' for the job
  67.  * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
  68.  * LongWritable.class, Text.class);
  69.  *
  70.  * // Defines additional multi sequencefile based output 'sequence' for the
  71.  * // job
  72.  * MultipleOutputs.addMultiNamedOutput(conf, "seq",
  73.  *   SequenceFileOutputFormat.class,
  74.  *   LongWritable.class, Text.class);
  75.  * ...
  76.  *
  77.  * JobClient jc = new JobClient();
  78.  * RunningJob job = jc.submitJob(conf);
  79.  *
  80.  * ...
  81.  * </pre>
  82.  * <p/>
  83.  * Job configuration usage pattern is:
  84.  * <pre>
  85.  *
  86.  * public class MOReduce implements
  87.  *   Reducer&lt;WritableComparable, Writable&gt; {
  88.  * private MultipleOutputs mos;
  89.  *
  90.  * public void configure(JobConf conf) {
  91.  * ...
  92.  * mos = new MultipleOutputs(conf);
  93.  * }
  94.  *
  95.  * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
  96.  * OutputCollector output, Reporter reporter)
  97.  * throws IOException {
  98.  * ...
  99.  * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
  100.  * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
  101.  * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
  102.  * ...
  103.  * }
  104.  *
  105.  * public void close() throws IOException {
  106.  * mos.close();
  107.  * ...
  108.  * }
  109.  *
  110.  * }
  111.  * </pre>
  112.  */
  113. public class MultipleOutputs {
  114.   private static final String NAMED_OUTPUTS = "mo.namedOutputs";
  115.   private static final String MO_PREFIX = "mo.namedOutput.";
  116.   private static final String FORMAT = ".format";
  117.   private static final String KEY = ".key";
  118.   private static final String VALUE = ".value";
  119.   private static final String MULTI = ".multi";
  120.   private static final String COUNTERS_ENABLED = "mo.counters";
  121.   /**
  122.    * Counters group used by the counters of MultipleOutputs.
  123.    */
  124.   private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
  125.   /**
  126.    * Checks if a named output is alreadyDefined or not.
  127.    *
  128.    * @param conf           job conf
  129.    * @param namedOutput    named output names
  130.    * @param alreadyDefined whether the existence/non-existence of
  131.    *                       the named output is to be checked
  132.    * @throws IllegalArgumentException if the output name is alreadyDefined or
  133.    *                                  not depending on the value of the
  134.    *                                  'alreadyDefined' parameter
  135.    */
  136.   private static void checkNamedOutput(JobConf conf, String namedOutput,
  137.                                        boolean alreadyDefined) {
  138.     List<String> definedChannels = getNamedOutputsList(conf);
  139.     if (alreadyDefined && definedChannels.contains(namedOutput)) {
  140.       throw new IllegalArgumentException("Named output '" + namedOutput +
  141.         "' already alreadyDefined");
  142.     } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
  143.       throw new IllegalArgumentException("Named output '" + namedOutput +
  144.         "' not defined");
  145.     }
  146.   }
  147.   /**
  148.    * Checks if a named output name is valid token.
  149.    *
  150.    * @param namedOutput named output Name
  151.    * @throws IllegalArgumentException if the output name is not valid.
  152.    */
  153.   private static void checkTokenName(String namedOutput) {
  154.     if (namedOutput == null || namedOutput.length() == 0) {
  155.       throw new IllegalArgumentException(
  156.         "Name cannot be NULL or emtpy");
  157.     }
  158.     for (char ch : namedOutput.toCharArray()) {
  159.       if ((ch >= 'A') && (ch <= 'Z')) {
  160.         continue;
  161.       }
  162.       if ((ch >= 'a') && (ch <= 'z')) {
  163.         continue;
  164.       }
  165.       if ((ch >= '0') && (ch <= '9')) {
  166.         continue;
  167.       }
  168.       throw new IllegalArgumentException(
  169.         "Name cannot be have a '" + ch + "' char");
  170.     }
  171.   }
  172.   /**
  173.    * Checks if a named output name is valid.
  174.    *
  175.    * @param namedOutput named output Name
  176.    * @throws IllegalArgumentException if the output name is not valid.
  177.    */
  178.   private static void checkNamedOutputName(String namedOutput) {
  179.     checkTokenName(namedOutput);
  180.     // name cannot be the name used for the default output
  181.     if (namedOutput.equals("part")) {
  182.       throw new IllegalArgumentException(
  183.         "Named output name cannot be 'part'");
  184.     }
  185.   }
  186.   /**
  187.    * Returns list of channel names.
  188.    *
  189.    * @param conf job conf
  190.    * @return List of channel Names
  191.    */
  192.   public static List<String> getNamedOutputsList(JobConf conf) {
  193.     List<String> names = new ArrayList<String>();
  194.     StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
  195.     while (st.hasMoreTokens()) {
  196.       names.add(st.nextToken());
  197.     }
  198.     return names;
  199.   }
  200.   /**
  201.    * Returns if a named output is multiple.
  202.    *
  203.    * @param conf        job conf
  204.    * @param namedOutput named output
  205.    * @return <code>true</code> if the name output is multi, <code>false</code>
  206.    *         if it is single. If the name output is not defined it returns
  207.    *         <code>false</code>
  208.    */
  209.   public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
  210.     checkNamedOutput(conf, namedOutput, false);
  211.     return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
  212.   }
  213.   /**
  214.    * Returns the named output OutputFormat.
  215.    *
  216.    * @param conf        job conf
  217.    * @param namedOutput named output
  218.    * @return namedOutput OutputFormat
  219.    */
  220.   public static Class<? extends OutputFormat> getNamedOutputFormatClass(
  221.     JobConf conf, String namedOutput) {
  222.     checkNamedOutput(conf, namedOutput, false);
  223.     return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
  224.       OutputFormat.class);
  225.   }
  226.   /**
  227.    * Returns the key class for a named output.
  228.    *
  229.    * @param conf        job conf
  230.    * @param namedOutput named output
  231.    * @return class for the named output key
  232.    */
  233.   public static Class<? extends WritableComparable> getNamedOutputKeyClass(JobConf conf,
  234.                                                 String namedOutput) {
  235.     checkNamedOutput(conf, namedOutput, false);
  236.     return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
  237. WritableComparable.class);
  238.   }
  239.   /**
  240.    * Returns the value class for a named output.
  241.    *
  242.    * @param conf        job conf
  243.    * @param namedOutput named output
  244.    * @return class of named output value
  245.    */
  246.   public static Class<? extends Writable> getNamedOutputValueClass(JobConf conf,
  247.                                                   String namedOutput) {
  248.     checkNamedOutput(conf, namedOutput, false);
  249.     return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
  250.       Writable.class);
  251.   }
  252.   /**
  253.    * Adds a named output for the job.
  254.    * <p/>
  255.    *
  256.    * @param conf              job conf to add the named output
  257.    * @param namedOutput       named output name, it has to be a word, letters
  258.    *                          and numbers only, cannot be the word 'part' as
  259.    *                          that is reserved for the
  260.    *                          default output.
  261.    * @param outputFormatClass OutputFormat class.
  262.    * @param keyClass          key class
  263.    * @param valueClass        value class
  264.    */
  265.   public static void addNamedOutput(JobConf conf, String namedOutput,
  266.                                 Class<? extends OutputFormat> outputFormatClass,
  267.                                 Class<?> keyClass, Class<?> valueClass) {
  268.     addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
  269.       valueClass);
  270.   }
  271.   /**
  272.    * Adds a multi named output for the job.
  273.    * <p/>
  274.    *
  275.    * @param conf              job conf to add the named output
  276.    * @param namedOutput       named output name, it has to be a word, letters
  277.    *                          and numbers only, cannot be the word 'part' as
  278.    *                          that is reserved for the
  279.    *                          default output.
  280.    * @param outputFormatClass OutputFormat class.
  281.    * @param keyClass          key class
  282.    * @param valueClass        value class
  283.    */
  284.   public static void addMultiNamedOutput(JobConf conf, String namedOutput,
  285.                                Class<? extends OutputFormat> outputFormatClass,
  286.                                Class<?> keyClass, Class<?> valueClass) {
  287.     addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
  288.       valueClass);
  289.   }
  290.   /**
  291.    * Adds a named output for the job.
  292.    * <p/>
  293.    *
  294.    * @param conf              job conf to add the named output
  295.    * @param namedOutput       named output name, it has to be a word, letters
  296.    *                          and numbers only, cannot be the word 'part' as
  297.    *                          that is reserved for the
  298.    *                          default output.
  299.    * @param multi             indicates if the named output is multi
  300.    * @param outputFormatClass OutputFormat class.
  301.    * @param keyClass          key class
  302.    * @param valueClass        value class
  303.    */
  304.   private static void addNamedOutput(JobConf conf, String namedOutput,
  305.                                boolean multi,
  306.                                Class<? extends OutputFormat> outputFormatClass,
  307.                                Class<?> keyClass, Class<?> valueClass) {
  308.     checkNamedOutputName(namedOutput);
  309.     checkNamedOutput(conf, namedOutput, true);
  310.     conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
  311.     conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
  312.       OutputFormat.class);
  313.     conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
  314.     conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
  315.     conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
  316.   }
  317.   /**
  318.    * Enables or disables counters for the named outputs.
  319.    * <p/>
  320.    * By default these counters are disabled.
  321.    * <p/>
  322.    * MultipleOutputs supports counters, by default the are disabled.
  323.    * The counters group is the {@link MultipleOutputs} class name.
  324.    * </p>
  325.    * The names of the counters are the same as the named outputs. For multi
  326.    * named outputs the name of the counter is the concatenation of the named
  327.    * output, and underscore '_' and the multiname.
  328.    *
  329.    * @param conf    job conf to enableadd the named output.
  330.    * @param enabled indicates if the counters will be enabled or not.
  331.    */
  332.   public static void setCountersEnabled(JobConf conf, boolean enabled) {
  333.     conf.setBoolean(COUNTERS_ENABLED, enabled);
  334.   }
  335.   /**
  336.    * Returns if the counters for the named outputs are enabled or not.
  337.    * <p/>
  338.    * By default these counters are disabled.
  339.    * <p/>
  340.    * MultipleOutputs supports counters, by default the are disabled.
  341.    * The counters group is the {@link MultipleOutputs} class name.
  342.    * </p>
  343.    * The names of the counters are the same as the named outputs. For multi
  344.    * named outputs the name of the counter is the concatenation of the named
  345.    * output, and underscore '_' and the multiname.
  346.    *
  347.    *
  348.    * @param conf    job conf to enableadd the named output.
  349.    * @return TRUE if the counters are enabled, FALSE if they are disabled.
  350.    */
  351.   public static boolean getCountersEnabled(JobConf conf) {
  352.     return conf.getBoolean(COUNTERS_ENABLED, false);
  353.   }
  354.   // instance code, to be used from Mapper/Reducer code
  355.   private JobConf conf;
  356.   private OutputFormat outputFormat;
  357.   private Set<String> namedOutputs;
  358.   private Map<String, RecordWriter> recordWriters;
  359.   private boolean countersEnabled;
  360.   /**
  361.    * Creates and initializes multiple named outputs support, it should be
  362.    * instantiated in the Mapper/Reducer configure method.
  363.    *
  364.    * @param job the job configuration object
  365.    */
  366.   public MultipleOutputs(JobConf job) {
  367.     this.conf = job;
  368.     outputFormat = new InternalFileOutputFormat();
  369.     namedOutputs = Collections.unmodifiableSet(
  370.       new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
  371.     recordWriters = new HashMap<String, RecordWriter>();
  372.     countersEnabled = getCountersEnabled(job);
  373.   }
  374.   /**
  375.    * Returns iterator with the defined name outputs.
  376.    *
  377.    * @return iterator with the defined named outputs
  378.    */
  379.   public Iterator<String> getNamedOutputs() {
  380.     return namedOutputs.iterator();
  381.   }
  382.   // by being synchronized MultipleOutputTask can be use with a
  383.   // MultithreaderMapRunner.
  384.   private synchronized RecordWriter getRecordWriter(String namedOutput,
  385.                                                     String baseFileName,
  386.                                                     final Reporter reporter)
  387.     throws IOException {
  388.     RecordWriter writer = recordWriters.get(baseFileName);
  389.     if (writer == null) {
  390.       if (countersEnabled && reporter == null) {
  391.         throw new IllegalArgumentException(
  392.           "Counters are enabled, Reporter cannot be NULL");
  393.       }
  394.       JobConf jobConf = new JobConf(conf);
  395.       jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
  396.       FileSystem fs = FileSystem.get(conf);
  397.       writer =
  398.         outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
  399.       if (countersEnabled) {
  400.         if (reporter == null) {
  401.           throw new IllegalArgumentException(
  402.             "Counters are enabled, Reporter cannot be NULL");
  403.         }
  404.         writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
  405.       }
  406.       recordWriters.put(baseFileName, writer);
  407.     }
  408.     return writer;
  409.   }
  410.   private static class RecordWriterWithCounter implements RecordWriter {
  411.     private RecordWriter writer;
  412.     private String counterName;
  413.     private Reporter reporter;
  414.     public RecordWriterWithCounter(RecordWriter writer, String counterName,
  415.                                    Reporter reporter) {
  416.       this.writer = writer;
  417.       this.counterName = counterName;
  418.       this.reporter = reporter;
  419.     }
  420.     @SuppressWarnings({"unchecked"})
  421.     public void write(Object key, Object value) throws IOException {
  422.       reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
  423.       writer.write(key, value);
  424.     }
  425.     public void close(Reporter reporter) throws IOException {
  426.       writer.close(reporter);
  427.     }
  428.   }
  429.   /**
  430.    * Gets the output collector for a named output.
  431.    * <p/>
  432.    *
  433.    * @param namedOutput the named output name
  434.    * @param reporter    the reporter
  435.    * @return the output collector for the given named output
  436.    * @throws IOException thrown if output collector could not be created
  437.    */
  438.   @SuppressWarnings({"unchecked"})
  439.   public OutputCollector getCollector(String namedOutput, Reporter reporter)
  440.     throws IOException {
  441.     return getCollector(namedOutput, null, reporter);
  442.   }
  443.   /**
  444.    * Gets the output collector for a multi named output.
  445.    * <p/>
  446.    *
  447.    * @param namedOutput the named output name
  448.    * @param multiName   the multi name part
  449.    * @param reporter    the reporter
  450.    * @return the output collector for the given named output
  451.    * @throws IOException thrown if output collector could not be created
  452.    */
  453.   @SuppressWarnings({"unchecked"})
  454.   public OutputCollector getCollector(String namedOutput, String multiName,
  455.                                       Reporter reporter)
  456.     throws IOException {
  457.     checkNamedOutputName(namedOutput);
  458.     if (!namedOutputs.contains(namedOutput)) {
  459.       throw new IllegalArgumentException("Undefined named output '" +
  460.         namedOutput + "'");
  461.     }
  462.     boolean multi = isMultiNamedOutput(conf, namedOutput);
  463.     if (!multi && multiName != null) {
  464.       throw new IllegalArgumentException("Name output '" + namedOutput +
  465.         "' has not been defined as multi");
  466.     }
  467.     if (multi) {
  468.       checkTokenName(multiName);
  469.     }
  470.     String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
  471.     final RecordWriter writer =
  472.       getRecordWriter(namedOutput, baseFileName, reporter);
  473.     return new OutputCollector() {
  474.       @SuppressWarnings({"unchecked"})
  475.       public void collect(Object key, Object value) throws IOException {
  476.         writer.write(key, value);
  477.       }
  478.     };
  479.   }
  480.   /**
  481.    * Closes all the opened named outputs.
  482.    * <p/>
  483.    * If overriden subclasses must invoke <code>super.close()</code> at the
  484.    * end of their <code>close()</code>
  485.    *
  486.    * @throws java.io.IOException thrown if any of the MultipleOutput files
  487.    *                             could not be closed properly.
  488.    */
  489.   public void close() throws IOException {
  490.     for (RecordWriter writer : recordWriters.values()) {
  491.       writer.close(null);
  492.     }
  493.   }
  494.   private static class InternalFileOutputFormat extends
  495.     FileOutputFormat<Object, Object> {
  496.     public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
  497.     @SuppressWarnings({"unchecked"})
  498.     public RecordWriter<Object, Object> getRecordWriter(
  499.       FileSystem fs, JobConf job, String baseFileName, Progressable progress)
  500.       throws IOException {
  501.       String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
  502.       String fileName = getUniqueName(job, baseFileName);
  503.       // The following trick leverages the instantiation of a record writer via
  504.       // the job conf thus supporting arbitrary output formats.
  505.       JobConf outputConf = new JobConf(job);
  506.       outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
  507.       outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
  508.       outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
  509.       OutputFormat outputFormat = outputConf.getOutputFormat();
  510.       return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
  511.     }
  512.   }
  513. }