SequenceFileAsBinaryInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.BytesWritable;
  24. import org.apache.hadoop.io.DataOutputBuffer;
  25. import org.apache.hadoop.io.SequenceFile;
  26. import org.apache.hadoop.mapred.InputSplit;
  27. import org.apache.hadoop.mapred.JobConf;
  28. import org.apache.hadoop.mapred.RecordReader;
  29. import org.apache.hadoop.mapred.Reporter;
  30. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  31. /**
  32.  * InputFormat reading keys, values from SequenceFiles in binary (raw)
  33.  * format.
  34.  */
  35. public class SequenceFileAsBinaryInputFormat
  36.     extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
  37.   public SequenceFileAsBinaryInputFormat() {
  38.     super();
  39.   }
  40.   public RecordReader<BytesWritable,BytesWritable> getRecordReader(
  41.       InputSplit split, JobConf job, Reporter reporter)
  42.       throws IOException {
  43.     return new SequenceFileAsBinaryRecordReader(job, (FileSplit)split);
  44.   }
  45.   /**
  46.    * Read records from a SequenceFile as binary (raw) bytes.
  47.    */
  48.   public static class SequenceFileAsBinaryRecordReader
  49.       implements RecordReader<BytesWritable,BytesWritable> {
  50.     private SequenceFile.Reader in;
  51.     private long start;
  52.     private long end;
  53.     private boolean done = false;
  54.     private DataOutputBuffer buffer = new DataOutputBuffer();
  55.     private SequenceFile.ValueBytes vbytes;
  56.     public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
  57.         throws IOException {
  58.       Path path = split.getPath();
  59.       FileSystem fs = path.getFileSystem(conf);
  60.       this.in = new SequenceFile.Reader(fs, path, conf);
  61.       this.end = split.getStart() + split.getLength();
  62.       if (split.getStart() > in.getPosition())
  63.         in.sync(split.getStart());                  // sync to start
  64.       this.start = in.getPosition();
  65.       vbytes = in.createValueBytes();
  66.       done = start >= end;
  67.     }
  68.     public BytesWritable createKey() {
  69.       return new BytesWritable();
  70.     }
  71.     public BytesWritable createValue() {
  72.       return new BytesWritable();
  73.     }
  74.     /**
  75.      * Retrieve the name of the key class for this SequenceFile.
  76.      * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
  77.      */
  78.     public String getKeyClassName() {
  79.       return in.getKeyClassName();
  80.     }
  81.     /**
  82.      * Retrieve the name of the value class for this SequenceFile.
  83.      * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
  84.      */
  85.     public String getValueClassName() {
  86.       return in.getValueClassName();
  87.     }
  88.     /**
  89.      * Read raw bytes from a SequenceFile.
  90.      */
  91.     public synchronized boolean next(BytesWritable key, BytesWritable val)
  92.         throws IOException {
  93.       if (done) return false;
  94.       long pos = in.getPosition();
  95.       boolean eof = -1 == in.nextRawKey(buffer);
  96.       if (!eof) {
  97.         key.set(buffer.getData(), 0, buffer.getLength());
  98.         buffer.reset();
  99.         in.nextRawValue(vbytes);
  100.         vbytes.writeUncompressedBytes(buffer);
  101.         val.set(buffer.getData(), 0, buffer.getLength());
  102.         buffer.reset();
  103.       }
  104.       return !(done = (eof || (pos >= end && in.syncSeen())));
  105.     }
  106.     public long getPos() throws IOException {
  107.       return in.getPosition();
  108.     }
  109.     public void close() throws IOException {
  110.       in.close();
  111.     }
  112.     /**
  113.      * Return the progress within the input split
  114.      * @return 0.0 to 1.0 of the input byte range
  115.      */
  116.     public float getProgress() throws IOException {
  117.       if (end == start) {
  118.         return 0.0f;
  119.       } else {
  120.         return Math.min(1.0f, (float)((in.getPosition() - start) /
  121.                                       (double)(end - start)));
  122.       }
  123.     }
  124.   }
  125. }