JythonAbacus.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements.  See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership.  The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License.  You may obtain a copy of the License at
  9. #
  10. #     http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. from org.apache.hadoop.fs import Path
  19. from org.apache.hadoop.io import *
  20. from org.apache.hadoop.mapred import *
  21. from org.apache.hadoop.abacus import *
  22. from java.util import *;
  23. import sys
  24. class AbacusMapper(ValueAggregatorMapper):
  25.     def map(self, key, value, output, reporter):
  26.         ValueAggregatorMapper.map(self, key, value, output, reporter);
  27. class AbacusReducer(ValueAggregatorReducer):
  28.     def reduce(self, key, values, output, reporter):
  29.         ValueAggregatorReducer.reduce(self, key, values, output, reporter);
  30. class AbacusCombiner(ValueAggregatorCombiner):
  31.     def reduce(self, key, values, output, reporter):
  32.         ValueAggregatorCombiner.reduce(self, key, values, output, reporter);
  33. def printUsage(code):
  34.     print "Abacus <input> <output> <numOfReducers> <inputformat> <specfile>"
  35.     sys.exit(code)
  36. def main(args):
  37.     if len(args) < 6:
  38.         printUsage(1);
  39.     inDir = args[1];
  40.     outDir = args[2];
  41.     numOfReducers = int(args[3]);
  42.     theInputFormat = args[4];
  43.     specFile = args[5];
  44.                                         
  45.     print "numOfReducers: ", numOfReducers, "theInputFormat: ", theInputFormat, "specFile: ", specFile
  46.     conf = JobConf(AbacusMapper);
  47.     conf.setJobName("recordcount");
  48.     conf.addDefaultResource(Path(specFile));
  49.  
  50.     if theInputFormat=="textinputformat":
  51.         conf.setInputFormat(TextInputFormat);
  52.     else:
  53.         conf.setInputFormat(SequenceFileInputFormat);
  54.     conf.setOutputFormat(TextOutputFormat);
  55.     conf.setMapOutputKeyClass(Text);
  56.     conf.setMapOutputValueClass(Text);
  57.     conf.setOutputKeyClass(Text);
  58.     conf.setOutputValueClass(Text);
  59.     conf.setNumMapTasks(1);
  60.     conf.setNumReduceTasks(numOfReducers);
  61.     conf.setMapperClass(AbacusMapper);        
  62.     conf.setCombinerClass(AbacusCombiner);
  63.     conf.setReducerClass(AbacusReducer);
  64.     conf.setInputPath(Path(args[1]))
  65.     conf.setOutputPath(Path(args[2]))
  66.     JobClient.runJob(conf);
  67. if __name__ == "__main__":
  68.     main(sys.argv)