MultiFileWordCount.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.BufferedReader;
  20. import java.io.DataInput;
  21. import java.io.DataOutput;
  22. import java.io.IOException;
  23. import java.io.InputStreamReader;
  24. import java.util.StringTokenizer;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.conf.Configured;
  27. import org.apache.hadoop.fs.FSDataInputStream;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.io.IntWritable;
  31. import org.apache.hadoop.io.Text;
  32. import org.apache.hadoop.io.WritableComparable;
  33. import org.apache.hadoop.mapred.FileInputFormat;
  34. import org.apache.hadoop.mapred.FileOutputFormat;
  35. import org.apache.hadoop.mapred.InputSplit;
  36. import org.apache.hadoop.mapred.JobClient;
  37. import org.apache.hadoop.mapred.JobConf;
  38. import org.apache.hadoop.mapred.MapReduceBase;
  39. import org.apache.hadoop.mapred.Mapper;
  40. import org.apache.hadoop.mapred.MultiFileInputFormat;
  41. import org.apache.hadoop.mapred.MultiFileSplit;
  42. import org.apache.hadoop.mapred.OutputCollector;
  43. import org.apache.hadoop.mapred.RecordReader;
  44. import org.apache.hadoop.mapred.Reporter;
  45. import org.apache.hadoop.mapred.lib.LongSumReducer;
  46. import org.apache.hadoop.util.Tool;
  47. import org.apache.hadoop.util.ToolRunner;
  48. /**
  49.  * MultiFileWordCount is an example to demonstrate the usage of 
  50.  * MultiFileInputFormat. This examples counts the occurrences of
  51.  * words in the text files under the given input directory.
  52.  */
  53. public class MultiFileWordCount extends Configured implements Tool {
  54.   /**
  55.    * This record keeps <filename,offset> pairs.
  56.    */
  57.   public static class WordOffset implements WritableComparable {
  58.     private long offset;
  59.     private String fileName;
  60.     public void readFields(DataInput in) throws IOException {
  61.       this.offset = in.readLong();
  62.       this.fileName = Text.readString(in);
  63.     }
  64.     public void write(DataOutput out) throws IOException {
  65.       out.writeLong(offset);
  66.       Text.writeString(out, fileName);
  67.     }
  68.     public int compareTo(Object o) {
  69.       WordOffset that = (WordOffset)o;
  70.       int f = this.fileName.compareTo(that.fileName);
  71.       if(f == 0) {
  72.         return (int)Math.signum((double)(this.offset - that.offset));
  73.       }
  74.       return f;
  75.     }
  76.     @Override
  77.     public boolean equals(Object obj) {
  78.       if(obj instanceof WordOffset)
  79.         return this.compareTo(obj) == 0;
  80.       return false;
  81.     }
  82.     @Override
  83.     public int hashCode() {
  84.       assert false : "hashCode not designed";
  85.       return 42; //an arbitrary constant
  86.     }
  87.   }
  88.   /**
  89.    * To use {@link MultiFileInputFormat}, one should extend it, to return a 
  90.    * (custom) {@link RecordReader}. MultiFileInputFormat uses 
  91.    * {@link MultiFileSplit}s. 
  92.    */
  93.   public static class MyInputFormat 
  94.     extends MultiFileInputFormat<WordOffset, Text>  {
  95.     @Override
  96.     public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
  97.         , JobConf job, Reporter reporter) throws IOException {
  98.       return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
  99.     }
  100.   }
  101.   /**
  102.    * RecordReader is responsible from extracting records from the InputSplit. 
  103.    * This record reader accepts a {@link MultiFileSplit}, which encapsulates several 
  104.    * files, and no file is divided.
  105.    */
  106.   public static class MultiFileLineRecordReader 
  107.     implements RecordReader<WordOffset, Text> {
  108.     private MultiFileSplit split;
  109.     private long offset; //total offset read so far;
  110.     private long totLength;
  111.     private FileSystem fs;
  112.     private int count = 0;
  113.     private Path[] paths;
  114.     
  115.     private FSDataInputStream currentStream;
  116.     private BufferedReader currentReader;
  117.     
  118.     public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
  119.       throws IOException {
  120.       
  121.       this.split = split;
  122.       fs = FileSystem.get(conf);
  123.       this.paths = split.getPaths();
  124.       this.totLength = split.getLength();
  125.       this.offset = 0;
  126.       
  127.       //open the first file
  128.       Path file = paths[count];
  129.       currentStream = fs.open(file);
  130.       currentReader = new BufferedReader(new InputStreamReader(currentStream));
  131.     }
  132.     public void close() throws IOException { }
  133.     public long getPos() throws IOException {
  134.       long currentOffset = currentStream == null ? 0 : currentStream.getPos();
  135.       return offset + currentOffset;
  136.     }
  137.     public float getProgress() throws IOException {
  138.       return ((float)getPos()) / totLength;
  139.     }
  140.     public boolean next(WordOffset key, Text value) throws IOException {
  141.       if(count >= split.getNumPaths())
  142.         return false;
  143.       /* Read from file, fill in key and value, if we reach the end of file,
  144.        * then open the next file and continue from there until all files are
  145.        * consumed.  
  146.        */
  147.       String line;
  148.       do {
  149.         line = currentReader.readLine();
  150.         if(line == null) {
  151.           //close the file
  152.           currentReader.close();
  153.           offset += split.getLength(count);
  154.           
  155.           if(++count >= split.getNumPaths()) //if we are done
  156.             return false;
  157.           
  158.           //open a new file
  159.           Path file = paths[count];
  160.           currentStream = fs.open(file);
  161.           currentReader=new BufferedReader(new InputStreamReader(currentStream));
  162.           key.fileName = file.getName();
  163.         }
  164.       } while(line == null);
  165.       //update the key and value
  166.       key.offset = currentStream.getPos();
  167.       value.set(line);
  168.       
  169.       return true;
  170.     }
  171.     public WordOffset createKey() {
  172.       WordOffset wo = new WordOffset();
  173.       wo.fileName = paths[0].toString(); //set as the first file
  174.       return wo;
  175.     }
  176.     public Text createValue() {
  177.       return new Text();
  178.     }
  179.   }
  180.   /**
  181.    * This Mapper is similar to the one in {@link WordCount.MapClass}.
  182.    */
  183.   public static class MapClass extends MapReduceBase
  184.     implements Mapper<WordOffset, Text, Text, IntWritable> {
  185.     private final static IntWritable one = new IntWritable(1);
  186.     private Text word = new Text();
  187.     
  188.     public void map(WordOffset key, Text value,
  189.         OutputCollector<Text, IntWritable> output, Reporter reporter)
  190.         throws IOException {
  191.       
  192.       String line = value.toString();
  193.       StringTokenizer itr = new StringTokenizer(line);
  194.       while (itr.hasMoreTokens()) {
  195.         word.set(itr.nextToken());
  196.         output.collect(word, one);
  197.       }
  198.     }
  199.   }
  200.   
  201.   
  202.   private void printUsage() {
  203.     System.out.println("Usage : multifilewc <input_dir> <output>" );
  204.   }
  205.   public int run(String[] args) throws Exception {
  206.     if(args.length < 2) {
  207.       printUsage();
  208.       return 1;
  209.     }
  210.     JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
  211.     job.setJobName("MultiFileWordCount");
  212.     //set the InputFormat of the job to our InputFormat
  213.     job.setInputFormat(MyInputFormat.class);
  214.     
  215.     // the keys are words (strings)
  216.     job.setOutputKeyClass(Text.class);
  217.     // the values are counts (ints)
  218.     job.setOutputValueClass(IntWritable.class);
  219.     //use the defined mapper
  220.     job.setMapperClass(MapClass.class);
  221.     //use the WordCount Reducer
  222.     job.setCombinerClass(LongSumReducer.class);
  223.     job.setReducerClass(LongSumReducer.class);
  224.     FileInputFormat.addInputPaths(job, args[0]);
  225.     FileOutputFormat.setOutputPath(job, new Path(args[1]));
  226.     JobClient.runJob(job);
  227.     
  228.     return 0;
  229.   }
  230.   public static void main(String[] args) throws Exception {
  231.     int ret = ToolRunner.run(new MultiFileWordCount(), args);
  232.     System.exit(ret);
  233.   }
  234. }