WordCount.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:2k
源码类别:

网格计算

开发平台:

Java

  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements.  See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership.  The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License.  You may obtain a copy of the License at
  9. #
  10. #     http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. from org.apache.hadoop.fs import Path
  19. from org.apache.hadoop.io import *
  20. from org.apache.hadoop.mapred import *
  21. import sys
  22. import getopt
  23. class WordCountMap(Mapper, MapReduceBase):
  24.     one = IntWritable(1)
  25.     def map(self, key, value, output, reporter):
  26.         for w in value.toString().split():
  27.             output.collect(Text(w), self.one)
  28. class Summer(Reducer, MapReduceBase):
  29.     def reduce(self, key, values, output, reporter):
  30.         sum = 0
  31.         while values.hasNext():
  32.             sum += values.next().get()
  33.         output.collect(key, IntWritable(sum))
  34. def printUsage(code):
  35.     print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"
  36.     sys.exit(code)
  37. def main(args):
  38.     conf = JobConf(WordCountMap);
  39.     conf.setJobName("wordcount");
  40.  
  41.     conf.setOutputKeyClass(Text);
  42.     conf.setOutputValueClass(IntWritable);
  43.     
  44.     conf.setMapperClass(WordCountMap);        
  45.     conf.setCombinerClass(Summer);
  46.     conf.setReducerClass(Summer);
  47.     try:
  48.         flags, other_args = getopt.getopt(args[1:], "m:r:")
  49.     except getopt.GetoptError:
  50.         printUsage(1)
  51.     if len(other_args) != 2:
  52.         printUsage(1)
  53.     
  54.     for f,v in flags:
  55.         if f == "-m":
  56.             conf.setNumMapTasks(int(v))
  57.         elif f == "-r":
  58.             conf.setNumReduceTasks(int(v))
  59.     conf.setInputPath(Path(other_args[0]))
  60.     conf.setOutputPath(Path(other_args[1]))
  61.     JobClient.runJob(conf);
  62. if __name__ == "__main__":
  63.     main(sys.argv)