WordCountInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.*;
  20. import java.util.*;
  21. import org.apache.hadoop.fs.*;
  22. import org.apache.hadoop.io.*;
  23. import org.apache.hadoop.mapred.*;
  24. /**
  25.  * This is a support class to test Hadoop Pipes when using C++ RecordReaders.
  26.  * It defines an InputFormat with InputSplits that are just strings. The
  27.  * RecordReaders are not implemented in Java, naturally...
  28.  */
  29. public class WordCountInputFormat
  30.   extends FileInputFormat<IntWritable, Text> {
  31.   
  32.   static class WordCountInputSplit implements InputSplit  {
  33.     private String filename;
  34.     WordCountInputSplit() { }
  35.     WordCountInputSplit(Path filename) {
  36.       this.filename = filename.toUri().getPath();
  37.     }
  38.     public void write(DataOutput out) throws IOException { 
  39.       Text.writeString(out, filename); 
  40.     }
  41.     public void readFields(DataInput in) throws IOException { 
  42.       filename = Text.readString(in); 
  43.     }
  44.     public long getLength() { return 0L; }
  45.     public String[] getLocations() { return new String[0]; }
  46.   }
  47.   public InputSplit[] getSplits(JobConf conf, 
  48.                                 int numSplits) throws IOException {
  49.     ArrayList<InputSplit> result = new ArrayList<InputSplit>();
  50.     FileSystem local = FileSystem.getLocal(conf);
  51.     for(Path dir: getInputPaths(conf)) {
  52.       for(FileStatus file: local.listStatus(dir)) {
  53.         result.add(new WordCountInputSplit(file.getPath()));
  54.       }
  55.     }
  56.     return result.toArray(new InputSplit[result.size()]);
  57.   }
  58.   public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
  59.                                                          JobConf conf, 
  60.                                                          Reporter reporter) {
  61.     return new RecordReader<IntWritable, Text>(){
  62.       public boolean next(IntWritable key, Text value) throws IOException {
  63.         return false;
  64.       }
  65.       public IntWritable createKey() {
  66.         return new IntWritable();
  67.       }
  68.       public Text createValue() {
  69.         return new Text();
  70.       }
  71.       public long getPos() {
  72.         return 0;
  73.       }
  74.       public void close() { }
  75.       public float getProgress() { 
  76.         return 0.0f;
  77.       }
  78.     };
  79.   }
  80. }