CombinerJobCreator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import java.util.StringTokenizer;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.IntWritable;
  24. import org.apache.hadoop.io.LongWritable;
  25. import org.apache.hadoop.io.Text;
  26. public class CombinerJobCreator {
  27.    public static class MapClass extends MapReduceBase
  28.      implements Mapper<LongWritable, Text, Text, IntWritable> {
  29.      private final static IntWritable one = new IntWritable(1);
  30.      private Text word = new Text();
  31.      public void map(LongWritable key, Text value,
  32.                      OutputCollector<Text, IntWritable> output,
  33.                      Reporter reporter) throws IOException {
  34.        String line = value.toString();
  35.        StringTokenizer itr = new StringTokenizer(line);
  36.        while (itr.hasMoreTokens()) {
  37.          word.set(itr.nextToken());
  38.          output.collect(word, one);
  39.        }
  40.      }
  41.    }
  42.    public static class Reduce extends MapReduceBase
  43.      implements Reducer<Text, IntWritable, Text, IntWritable> {
  44.      public void reduce(Text key, Iterator<IntWritable> values,
  45.                         OutputCollector<Text, IntWritable> output,
  46.                         Reporter reporter) throws IOException {
  47.        int sum = 0;
  48.        while (values.hasNext()) {
  49.          sum += values.next().get();
  50.        }
  51.        output.collect(key, new IntWritable(sum));
  52.      }
  53.    }
  54.   public static JobConf createJob(String[] args) throws Exception {
  55.     JobConf conf = new JobConf(CombinerJobCreator.class);
  56.     conf.setJobName("GridmixCombinerJob");
  57.     // the keys are words (strings)
  58.     conf.setOutputKeyClass(Text.class);
  59.     // the values are counts (ints)
  60.     conf.setOutputValueClass(IntWritable.class);
  61.     conf.setMapperClass(MapClass.class);
  62.     conf.setCombinerClass(Reduce.class);
  63.     conf.setReducerClass(Reduce.class);
  64.     boolean mapoutputCompressed = false;
  65.     boolean outputCompressed = false;
  66.     // List<String> other_args = new ArrayList<String>();
  67.     for (int i = 0; i < args.length; ++i) {
  68.       try {
  69.         if ("-r".equals(args[i])) {
  70.           conf.setNumReduceTasks(Integer.parseInt(args[++i]));
  71.         } else if ("-indir".equals(args[i])) {
  72.           FileInputFormat.setInputPaths(conf, args[++i]);
  73.         } else if ("-outdir".equals(args[i])) {
  74.           FileOutputFormat.setOutputPath(conf, new Path(args[++i]));
  75.         } else if ("-mapoutputCompressed".equals(args[i])) {
  76.           mapoutputCompressed = Boolean.valueOf(args[++i]).booleanValue();
  77.         } else if ("-outputCompressed".equals(args[i])) {
  78.           outputCompressed = Boolean.valueOf(args[++i]).booleanValue();
  79.         }
  80.       } catch (NumberFormatException except) {
  81.         System.out.println("ERROR: Integer expected instead of " + args[i]);
  82.         return null;
  83.       } catch (ArrayIndexOutOfBoundsException except) {
  84.         System.out.println("ERROR: Required parameter missing from "
  85.             + args[i - 1]);
  86.         return null;
  87.       }
  88.     }
  89.     conf.setCompressMapOutput(mapoutputCompressed);
  90.     conf.setBoolean("mapred.output.compress", outputCompressed);
  91.     return conf;
  92.   }
  93. }