StreamInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.*;
  20. import java.lang.reflect.*;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.FSDataInputStream;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.mapred.*;
  25. /** An input format that selects a RecordReader based on a JobConf property.
  26.  *  This should be used only for non-standard record reader such as 
  27.  *  StreamXmlRecordReader. For all other standard 
  28.  *  record readers, the appropriate input format classes should be used.
  29.  */
  30. public class StreamInputFormat extends KeyValueTextInputFormat {
  31.   @SuppressWarnings("unchecked")
  32.   public RecordReader<Text, Text> getRecordReader(final InputSplit genericSplit,
  33.                                       JobConf job, Reporter reporter) throws IOException {
  34.     String c = job.get("stream.recordreader.class");
  35.     if (c == null || c.indexOf("LineRecordReader") >= 0) {
  36.       return super.getRecordReader(genericSplit, job, reporter);
  37.     }
  38.     // handling non-standard record reader (likely StreamXmlRecordReader) 
  39.     FileSplit split = (FileSplit) genericSplit;
  40.     LOG.info("getRecordReader start.....split=" + split);
  41.     reporter.setStatus(split.toString());
  42.     // Open the file and seek to the start of the split
  43.     FileSystem fs = split.getPath().getFileSystem(job);
  44.     FSDataInputStream in = fs.open(split.getPath());
  45.     // Factory dispatch based on available params..
  46.     Class readerClass;
  47.     {
  48.       readerClass = StreamUtil.goodClassOrNull(job, c, null);
  49.       if (readerClass == null) {
  50.         throw new RuntimeException("Class not found: " + c);
  51.       }
  52.     }
  53.     Constructor ctor;
  54.     try {
  55.       ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class,
  56.                                                       FileSplit.class, Reporter.class, JobConf.class, FileSystem.class });
  57.     } catch (NoSuchMethodException nsm) {
  58.       throw new RuntimeException(nsm);
  59.     }
  60.     RecordReader<Text, Text> reader;
  61.     try {
  62.       reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { in, split,
  63.                                                               reporter, job, fs });
  64.     } catch (Exception nsm) {
  65.       throw new RuntimeException(nsm);
  66.     }
  67.     return reader;
  68.   }
  69. }