WordCount.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Iterator;
  22. import java.util.List;
  23. import java.util.StringTokenizer;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.conf.Configured;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.IntWritable;
  28. import org.apache.hadoop.io.LongWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.mapred.FileInputFormat;
  31. import org.apache.hadoop.mapred.FileOutputFormat;
  32. import org.apache.hadoop.mapred.JobClient;
  33. import org.apache.hadoop.mapred.JobConf;
  34. import org.apache.hadoop.mapred.MapReduceBase;
  35. import org.apache.hadoop.mapred.Mapper;
  36. import org.apache.hadoop.mapred.OutputCollector;
  37. import org.apache.hadoop.mapred.Reducer;
  38. import org.apache.hadoop.mapred.Reporter;
  39. import org.apache.hadoop.util.Tool;
  40. import org.apache.hadoop.util.ToolRunner;
  41. /**
  42.  * This is an example Hadoop Map/Reduce application.
  43.  * It reads the text input files, breaks each line into words
  44.  * and counts them. The output is a locally sorted list of words and the 
  45.  * count of how often they occurred.
  46.  *
  47.  * To run: bin/hadoop jar build/hadoop-examples.jar wordcount
  48.  *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> 
  49.  */
  50. public class WordCount extends Configured implements Tool {
  51.   
  52.   /**
  53.    * Counts the words in each line.
  54.    * For each line of input, break the line into words and emit them as
  55.    * (<b>word</b>, <b>1</b>).
  56.    */
  57.   public static class MapClass extends MapReduceBase
  58.     implements Mapper<LongWritable, Text, Text, IntWritable> {
  59.     
  60.     private final static IntWritable one = new IntWritable(1);
  61.     private Text word = new Text();
  62.     
  63.     public void map(LongWritable key, Text value, 
  64.                     OutputCollector<Text, IntWritable> output, 
  65.                     Reporter reporter) throws IOException {
  66.       String line = value.toString();
  67.       StringTokenizer itr = new StringTokenizer(line);
  68.       while (itr.hasMoreTokens()) {
  69.         word.set(itr.nextToken());
  70.         output.collect(word, one);
  71.       }
  72.     }
  73.   }
  74.   
  75.   /**
  76.    * A reducer class that just emits the sum of the input values.
  77.    */
  78.   public static class Reduce extends MapReduceBase
  79.     implements Reducer<Text, IntWritable, Text, IntWritable> {
  80.     
  81.     public void reduce(Text key, Iterator<IntWritable> values,
  82.                        OutputCollector<Text, IntWritable> output, 
  83.                        Reporter reporter) throws IOException {
  84.       int sum = 0;
  85.       while (values.hasNext()) {
  86.         sum += values.next().get();
  87.       }
  88.       output.collect(key, new IntWritable(sum));
  89.     }
  90.   }
  91.   
  92.   static int printUsage() {
  93.     System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
  94.     ToolRunner.printGenericCommandUsage(System.out);
  95.     return -1;
  96.   }
  97.   
  98.   /**
  99.    * The main driver for word count map/reduce program.
  100.    * Invoke this method to submit the map/reduce job.
  101.    * @throws IOException When there is communication problems with the 
  102.    *                     job tracker.
  103.    */
  104.   public int run(String[] args) throws Exception {
  105.     JobConf conf = new JobConf(getConf(), WordCount.class);
  106.     conf.setJobName("wordcount");
  107.  
  108.     // the keys are words (strings)
  109.     conf.setOutputKeyClass(Text.class);
  110.     // the values are counts (ints)
  111.     conf.setOutputValueClass(IntWritable.class);
  112.     
  113.     conf.setMapperClass(MapClass.class);        
  114.     conf.setCombinerClass(Reduce.class);
  115.     conf.setReducerClass(Reduce.class);
  116.     
  117.     List<String> other_args = new ArrayList<String>();
  118.     for(int i=0; i < args.length; ++i) {
  119.       try {
  120.         if ("-m".equals(args[i])) {
  121.           conf.setNumMapTasks(Integer.parseInt(args[++i]));
  122.         } else if ("-r".equals(args[i])) {
  123.           conf.setNumReduceTasks(Integer.parseInt(args[++i]));
  124.         } else {
  125.           other_args.add(args[i]);
  126.         }
  127.       } catch (NumberFormatException except) {
  128.         System.out.println("ERROR: Integer expected instead of " + args[i]);
  129.         return printUsage();
  130.       } catch (ArrayIndexOutOfBoundsException except) {
  131.         System.out.println("ERROR: Required parameter missing from " +
  132.                            args[i-1]);
  133.         return printUsage();
  134.       }
  135.     }
  136.     // Make sure there are exactly 2 parameters left.
  137.     if (other_args.size() != 2) {
  138.       System.out.println("ERROR: Wrong number of parameters: " +
  139.                          other_args.size() + " instead of 2.");
  140.       return printUsage();
  141.     }
  142.     FileInputFormat.setInputPaths(conf, other_args.get(0));
  143.     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
  144.         
  145.     JobClient.runJob(conf);
  146.     return 0;
  147.   }
  148.   
  149.   
  150.   public static void main(String[] args) throws Exception {
  151.     int res = ToolRunner.run(new Configuration(), new WordCount(), args);
  152.     System.exit(res);
  153.   }
  154. }