LineDocRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.contrib.index.example;
- import java.io.BufferedInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.contrib.index.mapred.DocumentAndOp;
- import org.apache.hadoop.contrib.index.mapred.DocumentID;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.RecordReader;
- /**
- * A simple RecordReader for LineDoc for plain text files where each line is a
- * doc. Each line is as follows: documentID<SPACE>op<SPACE>content<EOF>,
- * where op can be "i", "ins" or "insert" for insert, "d", "del" or "delete"
- * for delete, or "u", "upd" or "update" for update.
- */
- public class LineDocRecordReader implements
- RecordReader<DocumentID, LineDocTextAndOp> {
- private static final char SPACE = ' ';
- private static final char EOL = 'n';
- private long start;
- private long pos;
- private long end;
- private BufferedInputStream in;
- private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
- /**
- * Provide a bridge to get the bytes from the ByteArrayOutputStream without
- * creating a new byte array.
- */
- private static class TextStuffer extends OutputStream {
- public Text target;
- public void write(int b) {
- throw new UnsupportedOperationException("write(byte) not supported");
- }
- public void write(byte[] data, int offset, int len) throws IOException {
- target.set(data, offset, len);
- }
- }
- private TextStuffer bridge = new TextStuffer();
- /**
- * Constructor
- * @param job
- * @param split
- * @throws IOException
- */
- public LineDocRecordReader(Configuration job, FileSplit split)
- throws IOException {
- long start = split.getStart();
- long end = start + split.getLength();
- final Path file = split.getPath();
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- InputStream in = fileIn;
- boolean skipFirstLine = false;
- if (start != 0) {
- skipFirstLine = true; // wait till BufferedInputStream to skip
- --start;
- fileIn.seek(start);
- }
- this.in = new BufferedInputStream(in);
- if (skipFirstLine) { // skip first line and re-establish "start".
- start += LineDocRecordReader.readData(this.in, null, EOL);
- }
- this.start = start;
- this.pos = start;
- this.end = end;
- }
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.RecordReader#close()
- */
- public void close() throws IOException {
- in.close();
- }
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.RecordReader#createKey()
- */
- public DocumentID createKey() {
- return new DocumentID();
- }
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.RecordReader#createValue()
- */
- public LineDocTextAndOp createValue() {
- return new LineDocTextAndOp();
- }
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.RecordReader#getPos()
- */
- public long getPos() throws IOException {
- return pos;
- }
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.RecordReader#getProgress()
- */
- public float getProgress() throws IOException {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (pos - start) / (float) (end - start));
- }
- }
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.RecordReader#next(java.lang.Object, java.lang.Object)
- */
- public synchronized boolean next(DocumentID key, LineDocTextAndOp value)
- throws IOException {
- if (pos >= end) {
- return false;
- }
- // key is document id, which are bytes until first space
- if (!readInto(key.getText(), SPACE)) {
- return false;
- }
- // read operation: i/d/u, or ins/del/upd, or insert/delete/update
- Text opText = new Text();
- if (!readInto(opText, SPACE)) {
- return false;
- }
- String opStr = opText.toString();
- DocumentAndOp.Op op;
- if (opStr.equals("i") || opStr.equals("ins") || opStr.equals("insert")) {
- op = DocumentAndOp.Op.INSERT;
- } else if (opStr.equals("d") || opStr.equals("del")
- || opStr.equals("delete")) {
- op = DocumentAndOp.Op.DELETE;
- } else if (opStr.equals("u") || opStr.equals("upd")
- || opStr.equals("update")) {
- op = DocumentAndOp.Op.UPDATE;
- } else {
- // default is insert
- op = DocumentAndOp.Op.INSERT;
- }
- value.setOp(op);
- if (op == DocumentAndOp.Op.DELETE) {
- return true;
- } else {
- // read rest of the line
- return readInto(value.getText(), EOL);
- }
- }
- private boolean readInto(Text text, char delimiter) throws IOException {
- buffer.reset();
- long bytesRead = readData(in, buffer, delimiter);
- if (bytesRead == 0) {
- return false;
- }
- pos += bytesRead;
- bridge.target = text;
- buffer.writeTo(bridge);
- return true;
- }
- private static long readData(InputStream in, OutputStream out, char delimiter)
- throws IOException {
- long bytes = 0;
- while (true) {
- int b = in.read();
- if (b == -1) {
- break;
- }
- bytes += 1;
- byte c = (byte) b;
- if (c == EOL || c == delimiter) {
- break;
- }
- if (c == 'r') {
- in.mark(1);
- byte nextC = (byte) in.read();
- if (nextC != EOL || c == delimiter) {
- in.reset();
- } else {
- bytes += 1;
- }
- break;
- }
- if (out != null) {
- out.write(c);
- }
- }
- return bytes;
- }
- }