StreamXmlRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.*;
  20. import java.util.regex.*;
  21. import org.apache.hadoop.io.DataOutputBuffer;
  22. import org.apache.hadoop.io.Writable;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.io.WritableComparable;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.FSDataInputStream;
  27. import org.apache.hadoop.mapred.Reporter;
  28. import org.apache.hadoop.mapred.FileSplit;
  29. import org.apache.hadoop.mapred.JobConf;
  30. /** A way to interpret XML fragments as Mapper input records.
  31.  *  Values are XML subtrees delimited by configurable tags.
  32.  *  Keys could be the value of a certain attribute in the XML subtree, 
  33.  *  but this is left to the stream processor application.
  34.  *
  35.  *  The name-value properties that StreamXmlRecordReader understands are:
  36.  *    String begin (chars marking beginning of record)
  37.  *    String end   (chars marking end of record)
  38.  *    int maxrec   (maximum record size)
  39.  *    int lookahead(maximum lookahead to sync CDATA)
  40.  *    boolean slowmatch
  41.  */
  42. public class StreamXmlRecordReader extends StreamBaseRecordReader {
  43.   public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
  44.                                JobConf job, FileSystem fs) throws IOException {
  45.     super(in, split, reporter, job, fs);
  46.     beginMark_ = checkJobGet(CONF_NS + "begin");
  47.     endMark_ = checkJobGet(CONF_NS + "end");
  48.     maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
  49.     lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
  50.     synched_ = false;
  51.     slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
  52.     if (slowMatch_) {
  53.       beginPat_ = makePatternCDataOrMark(beginMark_);
  54.       endPat_ = makePatternCDataOrMark(endMark_);
  55.     }
  56.     init();
  57.   }
  58.   public void init() throws IOException {
  59.     LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
  60.              + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
  61.              + in_.getPos());
  62.     if (start_ > in_.getPos()) {
  63.       in_.seek(start_);
  64.     }
  65.     pos_ = start_;
  66.     bin_ = new BufferedInputStream(in_);
  67.     seekNextRecordBoundary();
  68.   }
  69.   
  70.   int numNext = 0;
  71.   public synchronized boolean next(Text key, Text value) throws IOException {
  72.     numNext++;
  73.     if (pos_ >= end_) {
  74.       return false;
  75.     }
  76.     DataOutputBuffer buf = new DataOutputBuffer();
  77.     if (!readUntilMatchBegin()) {
  78.       return false;
  79.     }
  80.     if (!readUntilMatchEnd(buf)) {
  81.       return false;
  82.     }
  83.     // There is only one elem..key/value splitting is not done here.
  84.     byte[] record = new byte[buf.getLength()];
  85.     System.arraycopy(buf.getData(), 0, record, 0, record.length);
  86.     numRecStats(record, 0, record.length);
  87.     key.set(record);
  88.     value.set("");
  89.     return true;
  90.   }
  91.   public void seekNextRecordBoundary() throws IOException {
  92.     readUntilMatchBegin();
  93.   }
  94.   boolean readUntilMatchBegin() throws IOException {
  95.     if (slowMatch_) {
  96.       return slowReadUntilMatch(beginPat_, false, null);
  97.     } else {
  98.       return fastReadUntilMatch(beginMark_, false, null);
  99.     }
  100.   }
  101.   private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException {
  102.     if (slowMatch_) {
  103.       return slowReadUntilMatch(endPat_, true, buf);
  104.     } else {
  105.       return fastReadUntilMatch(endMark_, true, buf);
  106.     }
  107.   }
  108.   private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
  109.                                      DataOutputBuffer outBufOrNull) throws IOException {
  110.     byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
  111.     int read = 0;
  112.     bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); //mark to invalidate if we read more
  113.     read = bin_.read(buf);
  114.     if (read == -1) return false;
  115.     String sbuf = new String(buf, 0, read, "UTF-8");
  116.     Matcher match = markPattern.matcher(sbuf);
  117.     firstMatchStart_ = NA;
  118.     firstMatchEnd_ = NA;
  119.     int bufPos = 0;
  120.     int state = synched_ ? CDATA_OUT : CDATA_UNK;
  121.     int s = 0;
  122.     while (match.find(bufPos)) {
  123.       int input;
  124.       if (match.group(1) != null) {
  125.         input = CDATA_BEGIN;
  126.       } else if (match.group(2) != null) {
  127.         input = CDATA_END;
  128.         firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
  129.       } else {
  130.         input = RECORD_MAYBE;
  131.       }
  132.       if (input == RECORD_MAYBE) {
  133.         if (firstMatchStart_ == NA) {
  134.           firstMatchStart_ = match.start();
  135.           firstMatchEnd_ = match.end();
  136.         }
  137.       }
  138.       state = nextState(state, input, match.start());
  139.       if (state == RECORD_ACCEPT) {
  140.         break;
  141.       }
  142.       bufPos = match.end();
  143.       s++;
  144.     }
  145.     if (state != CDATA_UNK) {
  146.       synched_ = true;
  147.     }
  148.     boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK);
  149.     if (matched) {
  150.       int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
  151.       bin_.reset();
  152.       for (long skiplen = endPos; skiplen > 0; ) {
  153.         skiplen -= bin_.skip(skiplen); // Skip succeeds as we have read this buffer
  154.       }
  155.       pos_ += endPos;
  156.       if (outBufOrNull != null) {
  157.         outBufOrNull.writeBytes(sbuf.substring(0,endPos));
  158.       }
  159.     }
  160.     return matched;
  161.   }
  162.   // states
  163.   final static int CDATA_IN = 10;
  164.   final static int CDATA_OUT = 11;
  165.   final static int CDATA_UNK = 12;
  166.   final static int RECORD_ACCEPT = 13;
  167.   // inputs
  168.   final static int CDATA_BEGIN = 20;
  169.   final static int CDATA_END = 21;
  170.   final static int RECORD_MAYBE = 22;
  171.   /* also updates firstMatchStart_;*/
  172.   int nextState(int state, int input, int bufPos) {
  173.     switch (state) {
  174.     case CDATA_UNK:
  175.     case CDATA_OUT:
  176.       switch (input) {
  177.       case CDATA_BEGIN:
  178.         return CDATA_IN;
  179.       case CDATA_END:
  180.         if (state == CDATA_OUT) {
  181.           //System.out.println("buggy XML " + bufPos);
  182.         }
  183.         return CDATA_OUT;
  184.       case RECORD_MAYBE:
  185.         return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
  186.       }
  187.       break;
  188.     case CDATA_IN:
  189.       return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
  190.     }
  191.     throw new IllegalStateException(state + " " + input + " " + bufPos + " " + splitName_);
  192.   }
  193.   Pattern makePatternCDataOrMark(String escapedMark) {
  194.     StringBuffer pat = new StringBuffer();
  195.     addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
  196.     addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
  197.     addGroup(pat, escapedMark); // RECORD_MAYBE
  198.     return Pattern.compile(pat.toString());
  199.   }
  200.   void addGroup(StringBuffer pat, String escapedGroup) {
  201.     if (pat.length() > 0) {
  202.       pat.append("|");
  203.     }
  204.     pat.append("(");
  205.     pat.append(escapedGroup);
  206.     pat.append(")");
  207.   }
  208.   boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
  209.     byte[] cpat = textPat.getBytes("UTF-8");
  210.     int m = 0;
  211.     boolean match = false;
  212.     int msup = cpat.length;
  213.     int LL = 120000 * 10;
  214.     bin_.mark(LL); // large number to invalidate mark
  215.     while (true) {
  216.       int b = bin_.read();
  217.       if (b == -1) break;
  218.       byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
  219.       if (c == cpat[m]) {
  220.         m++;
  221.         if (m == msup) {
  222.           match = true;
  223.           break;
  224.         }
  225.       } else {
  226.         bin_.mark(LL); // rest mark so we could jump back if we found a match
  227.         if (outBufOrNull != null) {
  228.           outBufOrNull.write(cpat, 0, m);
  229.           outBufOrNull.write(c);
  230.           pos_ += m;
  231.         }
  232.         m = 0;
  233.       }
  234.     }
  235.     if (!includePat && match) {
  236.       bin_.reset();
  237.     } else if (outBufOrNull != null) {
  238.       outBufOrNull.write(cpat);
  239.       pos_ += msup;
  240.     }
  241.     return match;
  242.   }
  243.   String checkJobGet(String prop) throws IOException {
  244.     String val = job_.get(prop);
  245.     if (val == null) {
  246.       throw new IOException("JobConf: missing required property: " + prop);
  247.     }
  248.     return val;
  249.   }
  250.   String beginMark_;
  251.   String endMark_;
  252.   Pattern beginPat_;
  253.   Pattern endPat_;
  254.   boolean slowMatch_;
  255.   int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max record size
  256.   int maxRecSize_;
  257.   BufferedInputStream bin_; // Wrap FSDataInputStream for efficient backward seeks 
  258.   long pos_; // Keep track on position with respect encapsulated FSDataInputStream  
  259.   final static int NA = -1;
  260.   int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
  261.   int firstMatchEnd_ = 0;
  262.   boolean synched_;
  263. }