MultiFileWordCount.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.examples;
- import java.io.BufferedReader;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.MultiFileInputFormat;
- import org.apache.hadoop.mapred.MultiFileSplit;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.lib.LongSumReducer;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- /**
- * MultiFileWordCount is an example to demonstrate the usage of
- * MultiFileInputFormat. This examples counts the occurrences of
- * words in the text files under the given input directory.
- */
- public class MultiFileWordCount extends Configured implements Tool {
- /**
- * This record keeps <filename,offset> pairs.
- */
- public static class WordOffset implements WritableComparable {
- private long offset;
- private String fileName;
- public void readFields(DataInput in) throws IOException {
- this.offset = in.readLong();
- this.fileName = Text.readString(in);
- }
- public void write(DataOutput out) throws IOException {
- out.writeLong(offset);
- Text.writeString(out, fileName);
- }
- public int compareTo(Object o) {
- WordOffset that = (WordOffset)o;
- int f = this.fileName.compareTo(that.fileName);
- if(f == 0) {
- return (int)Math.signum((double)(this.offset - that.offset));
- }
- return f;
- }
- @Override
- public boolean equals(Object obj) {
- if(obj instanceof WordOffset)
- return this.compareTo(obj) == 0;
- return false;
- }
- @Override
- public int hashCode() {
- assert false : "hashCode not designed";
- return 42; //an arbitrary constant
- }
- }
- /**
- * To use {@link MultiFileInputFormat}, one should extend it, to return a
- * (custom) {@link RecordReader}. MultiFileInputFormat uses
- * {@link MultiFileSplit}s.
- */
- public static class MyInputFormat
- extends MultiFileInputFormat<WordOffset, Text> {
- @Override
- public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
- , JobConf job, Reporter reporter) throws IOException {
- return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
- }
- }
- /**
- * RecordReader is responsible from extracting records from the InputSplit.
- * This record reader accepts a {@link MultiFileSplit}, which encapsulates several
- * files, and no file is divided.
- */
- public static class MultiFileLineRecordReader
- implements RecordReader<WordOffset, Text> {
- private MultiFileSplit split;
- private long offset; //total offset read so far;
- private long totLength;
- private FileSystem fs;
- private int count = 0;
- private Path[] paths;
-
- private FSDataInputStream currentStream;
- private BufferedReader currentReader;
-
- public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
- throws IOException {
-
- this.split = split;
- fs = FileSystem.get(conf);
- this.paths = split.getPaths();
- this.totLength = split.getLength();
- this.offset = 0;
-
- //open the first file
- Path file = paths[count];
- currentStream = fs.open(file);
- currentReader = new BufferedReader(new InputStreamReader(currentStream));
- }
- public void close() throws IOException { }
- public long getPos() throws IOException {
- long currentOffset = currentStream == null ? 0 : currentStream.getPos();
- return offset + currentOffset;
- }
- public float getProgress() throws IOException {
- return ((float)getPos()) / totLength;
- }
- public boolean next(WordOffset key, Text value) throws IOException {
- if(count >= split.getNumPaths())
- return false;
- /* Read from file, fill in key and value, if we reach the end of file,
- * then open the next file and continue from there until all files are
- * consumed.
- */
- String line;
- do {
- line = currentReader.readLine();
- if(line == null) {
- //close the file
- currentReader.close();
- offset += split.getLength(count);
-
- if(++count >= split.getNumPaths()) //if we are done
- return false;
-
- //open a new file
- Path file = paths[count];
- currentStream = fs.open(file);
- currentReader=new BufferedReader(new InputStreamReader(currentStream));
- key.fileName = file.getName();
- }
- } while(line == null);
- //update the key and value
- key.offset = currentStream.getPos();
- value.set(line);
-
- return true;
- }
- public WordOffset createKey() {
- WordOffset wo = new WordOffset();
- wo.fileName = paths[0].toString(); //set as the first file
- return wo;
- }
- public Text createValue() {
- return new Text();
- }
- }
- /**
- * This Mapper is similar to the one in {@link WordCount.MapClass}.
- */
- public static class MapClass extends MapReduceBase
- implements Mapper<WordOffset, Text, Text, IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
-
- public void map(WordOffset key, Text value,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
-
- String line = value.toString();
- StringTokenizer itr = new StringTokenizer(line);
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- output.collect(word, one);
- }
- }
- }
-
-
- private void printUsage() {
- System.out.println("Usage : multifilewc <input_dir> <output>" );
- }
- public int run(String[] args) throws Exception {
- if(args.length < 2) {
- printUsage();
- return 1;
- }
- JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
- job.setJobName("MultiFileWordCount");
- //set the InputFormat of the job to our InputFormat
- job.setInputFormat(MyInputFormat.class);
-
- // the keys are words (strings)
- job.setOutputKeyClass(Text.class);
- // the values are counts (ints)
- job.setOutputValueClass(IntWritable.class);
- //use the defined mapper
- job.setMapperClass(MapClass.class);
- //use the WordCount Reducer
- job.setCombinerClass(LongSumReducer.class);
- job.setReducerClass(LongSumReducer.class);
- FileInputFormat.addInputPaths(job, args[0]);
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- JobClient.runJob(job);
-
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new MultiFileWordCount(), args);
- System.exit(ret);
- }
- }