LineDocRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.example;
  19. import java.io.BufferedInputStream;
  20. import java.io.ByteArrayOutputStream;
  21. import java.io.IOException;
  22. import java.io.InputStream;
  23. import java.io.OutputStream;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.contrib.index.mapred.DocumentAndOp;
  26. import org.apache.hadoop.contrib.index.mapred.DocumentID;
  27. import org.apache.hadoop.fs.FSDataInputStream;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.io.Text;
  31. import org.apache.hadoop.mapred.FileSplit;
  32. import org.apache.hadoop.mapred.RecordReader;
  33. /**
  34.  * A simple RecordReader for LineDoc for plain text files where each line is a
  35.  * doc. Each line is as follows: documentID<SPACE>op<SPACE>content<EOF>,
  36.  * where op can be "i", "ins" or "insert" for insert, "d", "del" or "delete"
  37.  * for delete, or "u", "upd" or "update" for update.
  38.  */
  39. public class LineDocRecordReader implements
  40.     RecordReader<DocumentID, LineDocTextAndOp> {
  41.   private static final char SPACE = ' ';
  42.   private static final char EOL = 'n';
  43.   private long start;
  44.   private long pos;
  45.   private long end;
  46.   private BufferedInputStream in;
  47.   private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
  48.   /**
  49.    * Provide a bridge to get the bytes from the ByteArrayOutputStream without
  50.    * creating a new byte array.
  51.    */
  52.   private static class TextStuffer extends OutputStream {
  53.     public Text target;
  54.     public void write(int b) {
  55.       throw new UnsupportedOperationException("write(byte) not supported");
  56.     }
  57.     public void write(byte[] data, int offset, int len) throws IOException {
  58.       target.set(data, offset, len);
  59.     }
  60.   }
  61.   private TextStuffer bridge = new TextStuffer();
  62.   /**
  63.    * Constructor
  64.    * @param job
  65.    * @param split  
  66.    * @throws IOException
  67.    */
  68.   public LineDocRecordReader(Configuration job, FileSplit split)
  69.       throws IOException {
  70.     long start = split.getStart();
  71.     long end = start + split.getLength();
  72.     final Path file = split.getPath();
  73.     // open the file and seek to the start of the split
  74.     FileSystem fs = file.getFileSystem(job);
  75.     FSDataInputStream fileIn = fs.open(split.getPath());
  76.     InputStream in = fileIn;
  77.     boolean skipFirstLine = false;
  78.     if (start != 0) {
  79.       skipFirstLine = true; // wait till BufferedInputStream to skip
  80.       --start;
  81.       fileIn.seek(start);
  82.     }
  83.     this.in = new BufferedInputStream(in);
  84.     if (skipFirstLine) { // skip first line and re-establish "start".
  85.       start += LineDocRecordReader.readData(this.in, null, EOL);
  86.     }
  87.     this.start = start;
  88.     this.pos = start;
  89.     this.end = end;
  90.   }
  91.   /* (non-Javadoc)
  92.    * @see org.apache.hadoop.mapred.RecordReader#close()
  93.    */
  94.   public void close() throws IOException {
  95.     in.close();
  96.   }
  97.   /* (non-Javadoc)
  98.    * @see org.apache.hadoop.mapred.RecordReader#createKey()
  99.    */
  100.   public DocumentID createKey() {
  101.     return new DocumentID();
  102.   }
  103.   /* (non-Javadoc)
  104.    * @see org.apache.hadoop.mapred.RecordReader#createValue()
  105.    */
  106.   public LineDocTextAndOp createValue() {
  107.     return new LineDocTextAndOp();
  108.   }
  109.   /* (non-Javadoc)
  110.    * @see org.apache.hadoop.mapred.RecordReader#getPos()
  111.    */
  112.   public long getPos() throws IOException {
  113.     return pos;
  114.   }
  115.   /* (non-Javadoc)
  116.    * @see org.apache.hadoop.mapred.RecordReader#getProgress()
  117.    */
  118.   public float getProgress() throws IOException {
  119.     if (start == end) {
  120.       return 0.0f;
  121.     } else {
  122.       return Math.min(1.0f, (pos - start) / (float) (end - start));
  123.     }
  124.   }
  125.   /* (non-Javadoc)
  126.    * @see org.apache.hadoop.mapred.RecordReader#next(java.lang.Object, java.lang.Object)
  127.    */
  128.   public synchronized boolean next(DocumentID key, LineDocTextAndOp value)
  129.       throws IOException {
  130.     if (pos >= end) {
  131.       return false;
  132.     }
  133.     // key is document id, which are bytes until first space
  134.     if (!readInto(key.getText(), SPACE)) {
  135.       return false;
  136.     }
  137.     // read operation: i/d/u, or ins/del/upd, or insert/delete/update
  138.     Text opText = new Text();
  139.     if (!readInto(opText, SPACE)) {
  140.       return false;
  141.     }
  142.     String opStr = opText.toString();
  143.     DocumentAndOp.Op op;
  144.     if (opStr.equals("i") || opStr.equals("ins") || opStr.equals("insert")) {
  145.       op = DocumentAndOp.Op.INSERT;
  146.     } else if (opStr.equals("d") || opStr.equals("del")
  147.         || opStr.equals("delete")) {
  148.       op = DocumentAndOp.Op.DELETE;
  149.     } else if (opStr.equals("u") || opStr.equals("upd")
  150.         || opStr.equals("update")) {
  151.       op = DocumentAndOp.Op.UPDATE;
  152.     } else {
  153.       // default is insert
  154.       op = DocumentAndOp.Op.INSERT;
  155.     }
  156.     value.setOp(op);
  157.     if (op == DocumentAndOp.Op.DELETE) {
  158.       return true;
  159.     } else {
  160.       // read rest of the line
  161.       return readInto(value.getText(), EOL);
  162.     }
  163.   }
  164.   private boolean readInto(Text text, char delimiter) throws IOException {
  165.     buffer.reset();
  166.     long bytesRead = readData(in, buffer, delimiter);
  167.     if (bytesRead == 0) {
  168.       return false;
  169.     }
  170.     pos += bytesRead;
  171.     bridge.target = text;
  172.     buffer.writeTo(bridge);
  173.     return true;
  174.   }
  175.   private static long readData(InputStream in, OutputStream out, char delimiter)
  176.       throws IOException {
  177.     long bytes = 0;
  178.     while (true) {
  179.       int b = in.read();
  180.       if (b == -1) {
  181.         break;
  182.       }
  183.       bytes += 1;
  184.       byte c = (byte) b;
  185.       if (c == EOL || c == delimiter) {
  186.         break;
  187.       }
  188.       if (c == 'r') {
  189.         in.mark(1);
  190.         byte nextC = (byte) in.read();
  191.         if (nextC != EOL || c == delimiter) {
  192.           in.reset();
  193.         } else {
  194.           bytes += 1;
  195.         }
  196.         break;
  197.       }
  198.       if (out != null) {
  199.         out.write(c);
  200.       }
  201.     }
  202.     return bytes;
  203.   }
  204. }