CompositeInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Map;
  22. import java.util.regex.Matcher;
  23. import java.util.regex.Pattern;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.io.WritableComparable;
  26. import org.apache.hadoop.mapred.InputFormat;
  27. import org.apache.hadoop.mapred.InputSplit;
  28. import org.apache.hadoop.mapred.JobConf;
  29. import org.apache.hadoop.mapred.Reporter;
  30. /**
  31.  * An InputFormat capable of performing joins over a set of data sources sorted
  32.  * and partitioned the same way.
  33.  * @see #setFormat
  34.  *
  35.  * A user may define new join types by setting the property
  36.  * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
  37.  * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
  38.  * ComposableRecordReader.
  39.  * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
  40.  * in the join.
  41.  * @see JoinRecordReader
  42.  * @see MultiFilterRecordReader
  43.  */
  44. public class CompositeInputFormat<K extends WritableComparable>
  45.       implements ComposableInputFormat<K,TupleWritable> {
  46.   // expression parse tree to which IF requests are proxied
  47.   private Parser.Node root;
  48.   public CompositeInputFormat() { }
  49.   /**
  50.    * Interpret a given string as a composite expression.
  51.    * {@code
  52.    *   func  ::= <ident>([<func>,]*<func>)
  53.    *   func  ::= tbl(<class>,"<path>")
  54.    *   class ::= @see java.lang.Class#forName(java.lang.String)
  55.    *   path  ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
  56.    * }
  57.    * Reads expression from the <tt>mapred.join.expr</tt> property and
  58.    * user-supplied join types from <tt>mapred.join.define.&lt;ident&gt;</tt>
  59.    *  types. Paths supplied to <tt>tbl</tt> are given as input paths to the
  60.    * InputFormat class listed.
  61.    * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
  62.    */
  63.   public void setFormat(JobConf job) throws IOException {
  64.     addDefaults();
  65.     addUserIdentifiers(job);
  66.     root = Parser.parse(job.get("mapred.join.expr", null), job);
  67.   }
  68.   /**
  69.    * Adds the default set of identifiers to the parser.
  70.    */
  71.   protected void addDefaults() {
  72.     try {
  73.       Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
  74.       Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
  75.       Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
  76.       Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
  77.     } catch (NoSuchMethodException e) {
  78.       throw new RuntimeException("FATAL: Failed to init defaults", e);
  79.     }
  80.   }
  81.   /**
  82.    * Inform the parser of user-defined types.
  83.    */
  84.   private void addUserIdentifiers(JobConf job) throws IOException {
  85.     Pattern x = Pattern.compile("^mapred\.join\.define\.(\w+)$");
  86.     for (Map.Entry<String,String> kv : job) {
  87.       Matcher m = x.matcher(kv.getKey());
  88.       if (m.matches()) {
  89.         try {
  90.           Parser.CNode.addIdentifier(m.group(1),
  91.               job.getClass(m.group(0), null, ComposableRecordReader.class));
  92.         } catch (NoSuchMethodException e) {
  93.           throw (IOException)new IOException(
  94.               "Invalid define for " + m.group(1)).initCause(e);
  95.         }
  96.       }
  97.     }
  98.   }
  99.   /**
  100.    * Build a CompositeInputSplit from the child InputFormats by assigning the
  101.    * ith split from each child to the ith composite split.
  102.    */
  103.   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
  104.     setFormat(job);
  105.     job.setLong("mapred.min.split.size", Long.MAX_VALUE);
  106.     return root.getSplits(job, numSplits);
  107.   }
  108.   /**
  109.    * Construct a CompositeRecordReader for the children of this InputFormat
  110.    * as defined in the init expression.
  111.    * The outermost join need only be composable, not necessarily a composite.
  112.    * Mandating TupleWritable isn't strictly correct.
  113.    */
  114.   @SuppressWarnings("unchecked") // child types unknown
  115.   public ComposableRecordReader<K,TupleWritable> getRecordReader(
  116.       InputSplit split, JobConf job, Reporter reporter) throws IOException {
  117.     setFormat(job);
  118.     return root.getRecordReader(split, job, reporter);
  119.   }
  120.   /**
  121.    * Convenience method for constructing composite formats.
  122.    * Given InputFormat class (inf), path (p) return:
  123.    * {@code tbl(<inf>, <p>) }
  124.    */
  125.   public static String compose(Class<? extends InputFormat> inf, String path) {
  126.     return compose(inf.getName().intern(), path, new StringBuffer()).toString();
  127.   }
  128.   /**
  129.    * Convenience method for constructing composite formats.
  130.    * Given operation (op), Object class (inf), set of paths (p) return:
  131.    * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
  132.    */
  133.   public static String compose(String op, Class<? extends InputFormat> inf,
  134.       String... path) {
  135.     final String infname = inf.getName();
  136.     StringBuffer ret = new StringBuffer(op + '(');
  137.     for (String p : path) {
  138.       compose(infname, p, ret);
  139.       ret.append(',');
  140.     }
  141.     ret.setCharAt(ret.length() - 1, ')');
  142.     return ret.toString();
  143.   }
  144.   /**
  145.    * Convenience method for constructing composite formats.
  146.    * Given operation (op), Object class (inf), set of paths (p) return:
  147.    * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
  148.    */
  149.   public static String compose(String op, Class<? extends InputFormat> inf,
  150.       Path... path) {
  151.     ArrayList<String> tmp = new ArrayList<String>(path.length);
  152.     for (Path p : path) {
  153.       tmp.add(p.toString());
  154.     }
  155.     return compose(op, inf, tmp.toArray(new String[0]));
  156.   }
  157.   private static StringBuffer compose(String inf, String path,
  158.       StringBuffer sb) {
  159.     sb.append("tbl(" + inf + ","");
  160.     sb.append(path);
  161.     sb.append("")");
  162.     return sb;
  163.   }
  164. }