Parser.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.CharArrayReader;
  20. import java.io.IOException;
  21. import java.io.StreamTokenizer;
  22. import java.lang.reflect.Constructor;
  23. import java.lang.reflect.InvocationTargetException;
  24. import java.util.ArrayList;
  25. import java.util.HashMap;
  26. import java.util.Iterator;
  27. import java.util.LinkedList;
  28. import java.util.List;
  29. import java.util.ListIterator;
  30. import java.util.Map;
  31. import java.util.Stack;
  32. import org.apache.hadoop.fs.Path;
  33. import org.apache.hadoop.io.WritableComparator;
  34. import org.apache.hadoop.mapred.FileInputFormat;
  35. import org.apache.hadoop.mapred.InputFormat;
  36. import org.apache.hadoop.mapred.InputSplit;
  37. import org.apache.hadoop.mapred.JobConf;
  38. import org.apache.hadoop.mapred.RecordReader;
  39. import org.apache.hadoop.mapred.Reporter;
  40. import org.apache.hadoop.util.ReflectionUtils;
  41. /**
  42.  * Very simple shift-reduce parser for join expressions.
  43.  *
  44.  * This should be sufficient for the user extension permitted now, but ought to
  45.  * be replaced with a parser generator if more complex grammars are supported.
  46.  * In particular, this "shift-reduce" parser has no states. Each set
  47.  * of formals requires a different internal node type, which is responsible for
  48.  * interpreting the list of tokens it receives. This is sufficient for the
  49.  * current grammar, but it has several annoying properties that might inhibit
  50.  * extension. In particular, parenthesis are always function calls; an
  51.  * algebraic or filter grammar would not only require a node type, but must
  52.  * also work around the internals of this parser.
  53.  *
  54.  * For most other cases, adding classes to the hierarchy- particularly by
  55.  * extending JoinRecordReader and MultiFilterRecordReader- is fairly
  56.  * straightforward. One need only override the relevant method(s) (usually only
  57.  * {@link CompositeRecordReader#combine}) and include a property to map its
  58.  * value to an identifier in the parser.
  59.  */
  60. public class Parser {
  61.   public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
  62.   /**
  63.    * Tagged-union type for tokens from the join expression.
  64.    * @see Parser.TType
  65.    */
  66.   public static class Token {
  67.     private TType type;
  68.     Token(TType type) {
  69.       this.type = type;
  70.     }
  71.     public TType getType() { return type; }
  72.     public Node getNode() throws IOException {
  73.       throw new IOException("Expected nodetype");
  74.     }
  75.     public double getNum() throws IOException {
  76.       throw new IOException("Expected numtype");
  77.     }
  78.     public String getStr() throws IOException {
  79.       throw new IOException("Expected strtype");
  80.     }
  81.   }
  82.   public static class NumToken extends Token {
  83.     private double num;
  84.     public NumToken(double num) {
  85.       super(TType.NUM);
  86.       this.num = num;
  87.     }
  88.     public double getNum() { return num; }
  89.   }
  90.   public static class NodeToken extends Token {
  91.     private Node node;
  92.     NodeToken(Node node) {
  93.       super(TType.CIF);
  94.       this.node = node;
  95.     }
  96.     public Node getNode() {
  97.       return node;
  98.     }
  99.   }
  100.   public static class StrToken extends Token {
  101.     private String str;
  102.     public StrToken(TType type, String str) {
  103.       super(type);
  104.       this.str = str;
  105.     }
  106.     public String getStr() {
  107.       return str;
  108.     }
  109.   }
  110.   /**
  111.    * Simple lexer wrapping a StreamTokenizer.
  112.    * This encapsulates the creation of tagged-union Tokens and initializes the
  113.    * SteamTokenizer.
  114.    */
  115.   private static class Lexer {
  116.     private StreamTokenizer tok;
  117.     Lexer(String s) {
  118.       tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
  119.       tok.quoteChar('"');
  120.       tok.parseNumbers();
  121.       tok.ordinaryChar(',');
  122.       tok.ordinaryChar('(');
  123.       tok.ordinaryChar(')');
  124.       tok.wordChars('$','$');
  125.       tok.wordChars('_','_');
  126.     }
  127.     Token next() throws IOException {
  128.       int type = tok.nextToken();
  129.       switch (type) {
  130.         case StreamTokenizer.TT_EOF:
  131.         case StreamTokenizer.TT_EOL:
  132.           return null;
  133.         case StreamTokenizer.TT_NUMBER:
  134.           return new NumToken(tok.nval);
  135.         case StreamTokenizer.TT_WORD:
  136.           return new StrToken(TType.IDENT, tok.sval);
  137.         case '"':
  138.           return new StrToken(TType.QUOT, tok.sval);
  139.         default:
  140.           switch (type) {
  141.             case ',':
  142.               return new Token(TType.COMMA);
  143.             case '(':
  144.               return new Token(TType.LPAREN);
  145.             case ')':
  146.               return new Token(TType.RPAREN);
  147.             default:
  148.               throw new IOException("Unexpected: " + type);
  149.           }
  150.       }
  151.     }
  152.   }
  153.   public abstract static class Node implements ComposableInputFormat {
  154.     /**
  155.      * Return the node type registered for the particular identifier.
  156.      * By default, this is a CNode for any composite node and a WNode
  157.      * for "wrapped" nodes. User nodes will likely be composite
  158.      * nodes.
  159.      * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
  160.      * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
  161.      */
  162.     static Node forIdent(String ident) throws IOException {
  163.       try {
  164.         if (!nodeCstrMap.containsKey(ident)) {
  165.           throw new IOException("No nodetype for " + ident);
  166.         }
  167.         return nodeCstrMap.get(ident).newInstance(ident);
  168.       } catch (IllegalAccessException e) {
  169.         throw (IOException)new IOException().initCause(e);
  170.       } catch (InstantiationException e) {
  171.         throw (IOException)new IOException().initCause(e);
  172.       } catch (InvocationTargetException e) {
  173.         throw (IOException)new IOException().initCause(e);
  174.       }
  175.     }
  176.     private static final Class<?>[] ncstrSig = { String.class };
  177.     private static final
  178.         Map<String,Constructor<? extends Node>> nodeCstrMap =
  179.         new HashMap<String,Constructor<? extends Node>>();
  180.     protected static final
  181.         Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
  182.         new HashMap<String,Constructor<? extends ComposableRecordReader>>();
  183.     /**
  184.      * For a given identifier, add a mapping to the nodetype for the parse
  185.      * tree and to the ComposableRecordReader to be created, including the
  186.      * formals required to invoke the constructor.
  187.      * The nodetype and constructor signature should be filled in from the
  188.      * child node.
  189.      */
  190.     protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
  191.                               Class<? extends Node> nodetype,
  192.                               Class<? extends ComposableRecordReader> cl)
  193.         throws NoSuchMethodException {
  194.       Constructor<? extends Node> ncstr =
  195.         nodetype.getDeclaredConstructor(ncstrSig);
  196.       ncstr.setAccessible(true);
  197.       nodeCstrMap.put(ident, ncstr);
  198.       Constructor<? extends ComposableRecordReader> mcstr =
  199.         cl.getDeclaredConstructor(mcstrSig);
  200.       mcstr.setAccessible(true);
  201.       rrCstrMap.put(ident, mcstr);
  202.     }
  203.     // inst
  204.     protected int id = -1;
  205.     protected String ident;
  206.     protected Class<? extends WritableComparator> cmpcl;
  207.     protected Node(String ident) {
  208.       this.ident = ident;
  209.     }
  210.     protected void setID(int id) {
  211.       this.id = id;
  212.     }
  213.     protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
  214.       this.cmpcl = cmpcl;
  215.     }
  216.     abstract void parse(List<Token> args, JobConf job) throws IOException;
  217.   }
  218.   /**
  219.    * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
  220.    */
  221.   static class WNode extends Node {
  222.     private static final Class<?>[] cstrSig =
  223.       { Integer.TYPE, RecordReader.class, Class.class };
  224.     static void addIdentifier(String ident,
  225.                               Class<? extends ComposableRecordReader> cl)
  226.         throws NoSuchMethodException {
  227.       Node.addIdentifier(ident, cstrSig, WNode.class, cl);
  228.     }
  229.     private String indir;
  230.     private InputFormat inf;
  231.     public WNode(String ident) {
  232.       super(ident);
  233.     }
  234.     /**
  235.      * Let the first actual define the InputFormat and the second define
  236.      * the <tt>mapred.input.dir</tt> property.
  237.      */
  238.     public void parse(List<Token> ll, JobConf job) throws IOException {
  239.       StringBuilder sb = new StringBuilder();
  240.       Iterator<Token> i = ll.iterator();
  241.       while (i.hasNext()) {
  242.         Token t = i.next();
  243.         if (TType.COMMA.equals(t.getType())) {
  244.           try {
  245.            inf = (InputFormat)ReflectionUtils.newInstance(
  246.            job.getClassByName(sb.toString()),
  247.                 job);
  248.           } catch (ClassNotFoundException e) {
  249.             throw (IOException)new IOException().initCause(e);
  250.           } catch (IllegalArgumentException e) {
  251.             throw (IOException)new IOException().initCause(e);
  252.           }
  253.           break;
  254.         }
  255.         sb.append(t.getStr());
  256.       }
  257.       if (!i.hasNext()) {
  258.         throw new IOException("Parse error");
  259.       }
  260.       Token t = i.next();
  261.       if (!TType.QUOT.equals(t.getType())) {
  262.         throw new IOException("Expected quoted string");
  263.       }
  264.       indir = t.getStr();
  265.       // no check for ll.isEmpty() to permit extension
  266.     }
  267.     private JobConf getConf(JobConf job) {
  268.       JobConf conf = new JobConf(job);
  269.       FileInputFormat.setInputPaths(conf, indir);
  270.       return conf;
  271.     }
  272.     public InputSplit[] getSplits(JobConf job, int numSplits)
  273.         throws IOException {
  274.       return inf.getSplits(getConf(job), numSplits);
  275.     }
  276.     public ComposableRecordReader getRecordReader(
  277.         InputSplit split, JobConf job, Reporter reporter) throws IOException {
  278.       try {
  279.         if (!rrCstrMap.containsKey(ident)) {
  280.           throw new IOException("No RecordReader for " + ident);
  281.         }
  282.         return rrCstrMap.get(ident).newInstance(id,
  283.             inf.getRecordReader(split, getConf(job), reporter), cmpcl);
  284.       } catch (IllegalAccessException e) {
  285.         throw (IOException)new IOException().initCause(e);
  286.       } catch (InstantiationException e) {
  287.         throw (IOException)new IOException().initCause(e);
  288.       } catch (InvocationTargetException e) {
  289.         throw (IOException)new IOException().initCause(e);
  290.       }
  291.     }
  292.     public String toString() {
  293.       return ident + "(" + inf.getClass().getName() + ","" + indir + "")";
  294.     }
  295.   }
  296.   /**
  297.    * Internal nodetype for &quot;composite&quot; InputFormats.
  298.    */
  299.   static class CNode extends Node {
  300.     private static final Class<?>[] cstrSig =
  301.       { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
  302.     static void addIdentifier(String ident,
  303.                               Class<? extends ComposableRecordReader> cl)
  304.         throws NoSuchMethodException {
  305.       Node.addIdentifier(ident, cstrSig, CNode.class, cl);
  306.     }
  307.     // inst
  308.     private ArrayList<Node> kids = new ArrayList<Node>();
  309.     public CNode(String ident) {
  310.       super(ident);
  311.     }
  312.     public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
  313.       super.setKeyComparator(cmpcl);
  314.       for (Node n : kids) {
  315.         n.setKeyComparator(cmpcl);
  316.       }
  317.     }
  318.     /**
  319.      * Combine InputSplits from child InputFormats into a
  320.      * {@link CompositeInputSplit}.
  321.      */
  322.     public InputSplit[] getSplits(JobConf job, int numSplits)
  323.         throws IOException {
  324.       InputSplit[][] splits = new InputSplit[kids.size()][];
  325.       for (int i = 0; i < kids.size(); ++i) {
  326.         final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
  327.         if (null == tmp) {
  328.           throw new IOException("Error gathering splits from child RReader");
  329.         }
  330.         if (i > 0 && splits[i-1].length != tmp.length) {
  331.           throw new IOException("Inconsistent split cardinality from child " +
  332.               i + " (" + splits[i-1].length + "/" + tmp.length + ")");
  333.         }
  334.         splits[i] = tmp;
  335.       }
  336.       final int size = splits[0].length;
  337.       CompositeInputSplit[] ret = new CompositeInputSplit[size];
  338.       for (int i = 0; i < size; ++i) {
  339.         ret[i] = new CompositeInputSplit(splits.length);
  340.         for (int j = 0; j < splits.length; ++j) {
  341.           ret[i].add(splits[j][i]);
  342.         }
  343.       }
  344.       return ret;
  345.     }
  346.     @SuppressWarnings("unchecked") // child types unknowable
  347.     public ComposableRecordReader getRecordReader(
  348.         InputSplit split, JobConf job, Reporter reporter) throws IOException {
  349.       if (!(split instanceof CompositeInputSplit)) {
  350.         throw new IOException("Invalid split type:" +
  351.                               split.getClass().getName());
  352.       }
  353.       final CompositeInputSplit spl = (CompositeInputSplit)split;
  354.       final int capacity = kids.size();
  355.       CompositeRecordReader ret = null;
  356.       try {
  357.         if (!rrCstrMap.containsKey(ident)) {
  358.           throw new IOException("No RecordReader for " + ident);
  359.         }
  360.         ret = (CompositeRecordReader)
  361.           rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
  362.       } catch (IllegalAccessException e) {
  363.         throw (IOException)new IOException().initCause(e);
  364.       } catch (InstantiationException e) {
  365.         throw (IOException)new IOException().initCause(e);
  366.       } catch (InvocationTargetException e) {
  367.         throw (IOException)new IOException().initCause(e);
  368.       }
  369.       for (int i = 0; i < capacity; ++i) {
  370.         ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
  371.       }
  372.       return (ComposableRecordReader)ret;
  373.     }
  374.     /**
  375.      * Parse a list of comma-separated nodes.
  376.      */
  377.     public void parse(List<Token> args, JobConf job) throws IOException {
  378.       ListIterator<Token> i = args.listIterator();
  379.       while (i.hasNext()) {
  380.         Token t = i.next();
  381.         t.getNode().setID(i.previousIndex() >> 1);
  382.         kids.add(t.getNode());
  383.         if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
  384.           throw new IOException("Expected ','");
  385.         }
  386.       }
  387.     }
  388.     public String toString() {
  389.       StringBuilder sb = new StringBuilder();
  390.       sb.append(ident + "(");
  391.       for (Node n : kids) {
  392.         sb.append(n.toString() + ",");
  393.       }
  394.       sb.setCharAt(sb.length() - 1, ')');
  395.       return sb.toString();
  396.     }
  397.   }
  398.   private static Token reduce(Stack<Token> st, JobConf job) throws IOException {
  399.     LinkedList<Token> args = new LinkedList<Token>();
  400.     while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
  401.       args.addFirst(st.pop());
  402.     }
  403.     if (st.isEmpty()) {
  404.       throw new IOException("Unmatched ')'");
  405.     }
  406.     st.pop();
  407.     if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
  408.       throw new IOException("Identifier expected");
  409.     }
  410.     Node n = Node.forIdent(st.pop().getStr());
  411.     n.parse(args, job);
  412.     return new NodeToken(n);
  413.   }
  414.   /**
  415.    * Given an expression and an optional comparator, build a tree of
  416.    * InputFormats using the comparator to sort keys.
  417.    */
  418.   static Node parse(String expr, JobConf job) throws IOException {
  419.     if (null == expr) {
  420.       throw new IOException("Expression is null");
  421.     }
  422.     Class<? extends WritableComparator> cmpcl =
  423.       job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
  424.     Lexer lex = new Lexer(expr);
  425.     Stack<Token> st = new Stack<Token>();
  426.     Token tok;
  427.     while ((tok = lex.next()) != null) {
  428.       if (TType.RPAREN.equals(tok.getType())) {
  429.         st.push(reduce(st, job));
  430.       } else {
  431.         st.push(tok);
  432.       }
  433.     }
  434.     if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
  435.       Node ret = st.pop().getNode();
  436.       if (cmpcl != null) {
  437.         ret.setKeyComparator(cmpcl);
  438.       }
  439.       return ret;
  440.     }
  441.     throw new IOException("Missing ')'");
  442.   }
  443. }