StreamBaseRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.*;
  20. import org.apache.hadoop.io.Text;
  21. import org.apache.hadoop.io.Writable;
  22. import org.apache.hadoop.io.WritableComparable;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.FSDataInputStream;
  26. import org.apache.hadoop.mapred.Reporter;
  27. import org.apache.hadoop.mapred.RecordReader;
  28. import org.apache.hadoop.mapred.FileSplit;
  29. import org.apache.hadoop.mapred.JobConf;
  30. import org.apache.commons.logging.*;
  31. /** 
  32.  * Shared functionality for hadoopStreaming formats.
  33.  * A custom reader can be defined to be a RecordReader with the constructor below
  34.  * and is selected with the option bin/hadoopStreaming -inputreader ...
  35.  * @see StreamXmlRecordReader 
  36.  */
  37. public abstract class StreamBaseRecordReader implements RecordReader<Text, Text> {
  38.   protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
  39.   // custom JobConf properties for this class are prefixed with this namespace
  40.   final static String CONF_NS = "stream.recordreader.";
  41.   public StreamBaseRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
  42.                                 JobConf job, FileSystem fs) throws IOException {
  43.     in_ = in;
  44.     split_ = split;
  45.     start_ = split_.getStart();
  46.     length_ = split_.getLength();
  47.     end_ = start_ + length_;
  48.     splitName_ = split_.getPath().getName();
  49.     reporter_ = reporter;
  50.     job_ = job;
  51.     fs_ = fs;
  52.     statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200);
  53.   }
  54.   /// RecordReader API
  55.   /** Read a record. Implementation should call numRecStats at the end
  56.    */
  57.   public abstract boolean next(Text key, Text value) throws IOException;
  58.   /** Returns the current position in the input. */
  59.   public synchronized long getPos() throws IOException {
  60.     return in_.getPos();
  61.   }
  62.   /** Close this to future operations.*/
  63.   public synchronized void close() throws IOException {
  64.     in_.close();
  65.   }
  66.   public float getProgress() throws IOException {
  67.     if (end_ == start_) {
  68.       return 1.0f;
  69.     } else {
  70.       return ((float)(in_.getPos() - start_)) / ((float)(end_ - start_));
  71.     }
  72.   }
  73.   
  74.   public Text createKey() {
  75.     return new Text();
  76.   }
  77.   public Text createValue() {
  78.     return new Text();
  79.   }
  80.   /// StreamBaseRecordReader API
  81.   /** Implementation should seek forward in_ to the first byte of the next record.
  82.    *  The initial byte offset in the stream is arbitrary.
  83.    */
  84.   public abstract void seekNextRecordBoundary() throws IOException;
  85.   void numRecStats(byte[] record, int start, int len) throws IOException {
  86.     numRec_++;
  87.     if (numRec_ == nextStatusRec_) {
  88.       String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars_), "UTF-8");
  89.       nextStatusRec_ += 100;//*= 10;
  90.       String status = getStatus(recordStr);
  91.       LOG.info(status);
  92.       reporter_.setStatus(status);
  93.     }
  94.   }
  95.   long lastMem = 0;
  96.   String getStatus(CharSequence record) {
  97.     long pos = -1;
  98.     try {
  99.       pos = getPos();
  100.     } catch (IOException io) {
  101.     }
  102.     String recStr;
  103.     if (record.length() > statusMaxRecordChars_) {
  104.       recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
  105.     } else {
  106.       recStr = record.toString();
  107.     }
  108.     String unqualSplit = split_.getPath().getName() + ":" +
  109.                          split_.getStart() + "+" + split_.getLength();
  110.     String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit
  111.       + " Processing record=" + recStr;
  112.     status += " " + splitName_;
  113.     return status;
  114.   }
  115.   FSDataInputStream in_;
  116.   FileSplit split_;
  117.   long start_;
  118.   long end_;
  119.   long length_;
  120.   String splitName_;
  121.   Reporter reporter_;
  122.   JobConf job_;
  123.   FileSystem fs_;
  124.   int numRec_ = 0;
  125.   int nextStatusRec_ = 1;
  126.   int statusMaxRecordChars_;
  127. }