LineReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.util;
- import java.io.IOException;
- import java.io.InputStream;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- /**
- * A class that provides a line reader from an input stream.
- */
- public class LineReader {
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
- private static final byte CR = 'r';
- private static final byte LF = 'n';
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size (64k).
- * @param in The input stream
- * @throws IOException
- */
- public LineReader(InputStream in) {
- this(in, DEFAULT_BUFFER_SIZE);
- }
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size.
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize) {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- }
- /**
- * Create a line reader that reads from the given stream using the
- * <code>io.file.buffer.size</code> specified in the given
- * <code>Configuration</code>.
- * @param in input stream
- * @param conf configuration
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf) throws IOException {
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
- }
- /**
- * Close the underlying stream.
- * @throws IOException
- */
- public void close() throws IOException {
- in.close();
- }
-
- /**
- * Read one line from the InputStream into the given Text. A line
- * can be terminated by one of the following: 'n' (LF) , 'r' (CR),
- * or 'rn' (CR+LF). EOF also terminates an otherwise unterminated
- * line.
- *
- * @param str the object to store the given line (without newline)
- * @param maxLineLength the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- *
- * @return the number of bytes read including the (longest) newline
- * found.
- *
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume) throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR)
- ++bytesConsumed; //account for CR from previous read
- bufferLength = in.read(buffer);
- if (bufferLength <= 0)
- break; // EOF
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0)
- --readLength; //CR at the end of the buffer
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long)Integer.MAX_VALUE)
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- return (int)bytesConsumed;
- }
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength) throws IOException {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str) throws IOException {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
- }