LineReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.util;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.io.Text;
  23. /**
  24.  * A class that provides a line reader from an input stream.
  25.  */
  26. public class LineReader {
  27.   private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
  28.   private int bufferSize = DEFAULT_BUFFER_SIZE;
  29.   private InputStream in;
  30.   private byte[] buffer;
  31.   // the number of bytes of real data in the buffer
  32.   private int bufferLength = 0;
  33.   // the current position in the buffer
  34.   private int bufferPosn = 0;
  35.   private static final byte CR = 'r';
  36.   private static final byte LF = 'n';
  37.   /**
  38.    * Create a line reader that reads from the given stream using the
  39.    * default buffer-size (64k).
  40.    * @param in The input stream
  41.    * @throws IOException
  42.    */
  43.   public LineReader(InputStream in) {
  44.     this(in, DEFAULT_BUFFER_SIZE);
  45.   }
  46.   /**
  47.    * Create a line reader that reads from the given stream using the 
  48.    * given buffer-size.
  49.    * @param in The input stream
  50.    * @param bufferSize Size of the read buffer
  51.    * @throws IOException
  52.    */
  53.   public LineReader(InputStream in, int bufferSize) {
  54.     this.in = in;
  55.     this.bufferSize = bufferSize;
  56.     this.buffer = new byte[this.bufferSize];
  57.   }
  58.   /**
  59.    * Create a line reader that reads from the given stream using the
  60.    * <code>io.file.buffer.size</code> specified in the given
  61.    * <code>Configuration</code>.
  62.    * @param in input stream
  63.    * @param conf configuration
  64.    * @throws IOException
  65.    */
  66.   public LineReader(InputStream in, Configuration conf) throws IOException {
  67.     this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
  68.   }
  69.   /**
  70.    * Close the underlying stream.
  71.    * @throws IOException
  72.    */
  73.   public void close() throws IOException {
  74.     in.close();
  75.   }
  76.   
  77.   /**
  78.    * Read one line from the InputStream into the given Text.  A line
  79.    * can be terminated by one of the following: 'n' (LF) , 'r' (CR),
  80.    * or 'rn' (CR+LF).  EOF also terminates an otherwise unterminated
  81.    * line.
  82.    *
  83.    * @param str the object to store the given line (without newline)
  84.    * @param maxLineLength the maximum number of bytes to store into str;
  85.    *  the rest of the line is silently discarded.
  86.    * @param maxBytesToConsume the maximum number of bytes to consume
  87.    *  in this call.  This is only a hint, because if the line cross
  88.    *  this threshold, we allow it to happen.  It can overshoot
  89.    *  potentially by as much as one buffer length.
  90.    *
  91.    * @return the number of bytes read including the (longest) newline
  92.    * found.
  93.    *
  94.    * @throws IOException if the underlying stream throws
  95.    */
  96.   public int readLine(Text str, int maxLineLength,
  97.                       int maxBytesToConsume) throws IOException {
  98.     /* We're reading data from in, but the head of the stream may be
  99.      * already buffered in buffer, so we have several cases:
  100.      * 1. No newline characters are in the buffer, so we need to copy
  101.      *    everything and read another buffer from the stream.
  102.      * 2. An unambiguously terminated line is in buffer, so we just
  103.      *    copy to str.
  104.      * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
  105.      *    in CR.  In this case we copy everything up to CR to str, but
  106.      *    we also need to see what follows CR: if it's LF, then we
  107.      *    need consume LF as well, so next call to readLine will read
  108.      *    from after that.
  109.      * We use a flag prevCharCR to signal if previous character was CR
  110.      * and, if it happens to be at the end of the buffer, delay
  111.      * consuming it until we have a chance to look at the char that
  112.      * follows.
  113.      */
  114.     str.clear();
  115.     int txtLength = 0; //tracks str.getLength(), as an optimization
  116.     int newlineLength = 0; //length of terminating newline
  117.     boolean prevCharCR = false; //true of prev char was CR
  118.     long bytesConsumed = 0;
  119.     do {
  120.       int startPosn = bufferPosn; //starting from where we left off the last time
  121.       if (bufferPosn >= bufferLength) {
  122.         startPosn = bufferPosn = 0;
  123.         if (prevCharCR)
  124.           ++bytesConsumed; //account for CR from previous read
  125.         bufferLength = in.read(buffer);
  126.         if (bufferLength <= 0)
  127.           break; // EOF
  128.       }
  129.       for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
  130.         if (buffer[bufferPosn] == LF) {
  131.           newlineLength = (prevCharCR) ? 2 : 1;
  132.           ++bufferPosn; // at next invocation proceed from following byte
  133.           break;
  134.         }
  135.         if (prevCharCR) { //CR + notLF, we are at notLF
  136.           newlineLength = 1;
  137.           break;
  138.         }
  139.         prevCharCR = (buffer[bufferPosn] == CR);
  140.       }
  141.       int readLength = bufferPosn - startPosn;
  142.       if (prevCharCR && newlineLength == 0)
  143.         --readLength; //CR at the end of the buffer
  144.       bytesConsumed += readLength;
  145.       int appendLength = readLength - newlineLength;
  146.       if (appendLength > maxLineLength - txtLength) {
  147.         appendLength = maxLineLength - txtLength;
  148.       }
  149.       if (appendLength > 0) {
  150.         str.append(buffer, startPosn, appendLength);
  151.         txtLength += appendLength;
  152.       }
  153.     } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
  154.     if (bytesConsumed > (long)Integer.MAX_VALUE)
  155.       throw new IOException("Too many bytes before newline: " + bytesConsumed);    
  156.     return (int)bytesConsumed;
  157.   }
  158.   /**
  159.    * Read from the InputStream into the given Text.
  160.    * @param str the object to store the given line
  161.    * @param maxLineLength the maximum number of bytes to store into str.
  162.    * @return the number of bytes read including the newline
  163.    * @throws IOException if the underlying stream throws
  164.    */
  165.   public int readLine(Text str, int maxLineLength) throws IOException {
  166.     return readLine(str, maxLineLength, Integer.MAX_VALUE);
  167. }
  168.   /**
  169.    * Read from the InputStream into the given Text.
  170.    * @param str the object to store the given line
  171.    * @return the number of bytes read including the newline
  172.    * @throws IOException if the underlying stream throws
  173.    */
  174.   public int readLine(Text str) throws IOException {
  175.     return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
  176.   }
  177. }