CombineFileRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.*;
  20. import java.util.*;
  21. import java.lang.reflect.*;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.mapred.*;
  25. import org.apache.hadoop.conf.Configuration;
  26. /**
  27.  * A generic RecordReader that can hand out different recordReaders
  28.  * for each chunk in a {@link CombineFileSplit}.
  29.  * A CombineFileSplit can combine data chunks from multiple files. 
  30.  * This class allows using different RecordReaders for processing
  31.  * these data chunks from different files.
  32.  * @see CombineFileSplit
  33.  */
  34. public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
  35.   static final Class [] constructorSignature = new Class [] 
  36.                                          {CombineFileSplit.class, 
  37.                                           Configuration.class, 
  38.                                           Reporter.class,
  39.                                           Integer.class};
  40.   protected CombineFileSplit split;
  41.   protected JobConf jc;
  42.   protected Reporter reporter;
  43.   protected Class<RecordReader<K, V>> rrClass;
  44.   protected Constructor<RecordReader<K, V>> rrConstructor;
  45.   protected FileSystem fs;
  46.   
  47.   protected int idx;
  48.   protected long progress;
  49.   protected RecordReader<K, V> curReader;
  50.   
  51.   public boolean next(K key, V value) throws IOException {
  52.     while ((curReader == null) || !curReader.next(key, value)) {
  53.       if (!initNextRecordReader()) {
  54.         return false;
  55.       }
  56.     }
  57.     return true;
  58.   }
  59.   public K createKey() {
  60.     return curReader.createKey();
  61.   }
  62.   
  63.   public V createValue() {
  64.     return curReader.createValue();
  65.   }
  66.   
  67.   /**
  68.    * return the amount of data processed
  69.    */
  70.   public long getPos() throws IOException {
  71.     return progress;
  72.   }
  73.   
  74.   public void close() throws IOException {
  75.     if (curReader != null) {
  76.       curReader.close();
  77.       curReader = null;
  78.     }
  79.   }
  80.   
  81.   /**
  82.    * return progress based on the amount of data processed so far.
  83.    */
  84.   public float getProgress() throws IOException {
  85.     return Math.min(1.0f,  progress/(float)(split.getLength()));
  86.   }
  87.   
  88.   /**
  89.    * A generic RecordReader that can hand out different recordReaders
  90.    * for each chunk in the CombineFileSplit.
  91.    */
  92.   public CombineFileRecordReader(JobConf job, CombineFileSplit split, 
  93.                                  Reporter reporter,
  94.                                  Class<RecordReader<K, V>> rrClass)
  95.     throws IOException {
  96.     this.split = split;
  97.     this.jc = job;
  98.     this.rrClass = rrClass;
  99.     this.reporter = reporter;
  100.     this.idx = 0;
  101.     this.curReader = null;
  102.     this.progress = 0;
  103.     try {
  104.       rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
  105.       rrConstructor.setAccessible(true);
  106.     } catch (Exception e) {
  107.       throw new RuntimeException(rrClass.getName() + 
  108.                                  " does not have valid constructor", e);
  109.     }
  110.     initNextRecordReader();
  111.   }
  112.   
  113.   /**
  114.    * Get the record reader for the next chunk in this CombineFileSplit.
  115.    */
  116.   protected boolean initNextRecordReader() throws IOException {
  117.     if (curReader != null) {
  118.       curReader.close();
  119.       curReader = null;
  120.       if (idx > 0) {
  121.         progress += split.getLength(idx-1);    // done processing so far
  122.       }
  123.     }
  124.     // if all chunks have been processed, nothing more to do.
  125.     if (idx == split.getNumPaths()) {
  126.       return false;
  127.     }
  128.     // get a record reader for the idx-th chunk
  129.     try {
  130.       curReader =  rrConstructor.newInstance(new Object [] 
  131.                             {split, jc, reporter, Integer.valueOf(idx)});
  132.       // setup some helper config variables.
  133.       jc.set("map.input.file", split.getPath(idx).toString());
  134.       jc.setLong("map.input.start", split.getOffset(idx));
  135.       jc.setLong("map.input.length", split.getLength(idx));
  136.     } catch (Exception e) {
  137.       throw new RuntimeException (e);
  138.     }
  139.     idx++;
  140.     return true;
  141.   }
  142. }