wordcount-part.cc
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:2k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/TemplateFactory.hh"
  20. #include "hadoop/StringUtils.hh"
  21. const std::string WORDCOUNT = "WORDCOUNT";
  22. const std::string INPUT_WORDS = "INPUT_WORDS";
  23. const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
  24. class WordCountMap: public HadoopPipes::Mapper {
  25. public:
  26.   HadoopPipes::TaskContext::Counter* inputWords;
  27.   
  28.   WordCountMap(HadoopPipes::TaskContext& context) {
  29.     inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
  30.   }
  31.   
  32.   void map(HadoopPipes::MapContext& context) {
  33.     std::vector<std::string> words = 
  34.       HadoopUtils::splitString(context.getInputValue(), " ");
  35.     for(unsigned int i=0; i < words.size(); ++i) {
  36.       context.emit(words[i], "1");
  37.     }
  38.     context.incrementCounter(inputWords, words.size());
  39.   }
  40. };
  41. class WordCountReduce: public HadoopPipes::Reducer {
  42. public:
  43.   HadoopPipes::TaskContext::Counter* outputWords;
  44.   WordCountReduce(HadoopPipes::TaskContext& context) {
  45.     outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
  46.   }
  47.   void reduce(HadoopPipes::ReduceContext& context) {
  48.     int sum = 0;
  49.     while (context.nextValue()) {
  50.       sum += HadoopUtils::toInt(context.getInputValue());
  51.     }
  52.     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
  53.     context.incrementCounter(outputWords, 1); 
  54.   }
  55. };
  56. class WordCountPartitioner: public HadoopPipes::Partitioner {
  57. public:
  58.   WordCountPartitioner(HadoopPipes::TaskContext& context){}
  59.   virtual int partition(const std::string& key, int numOfReduces) {
  60.     return 0;
  61.   }
  62. };
  63. int main(int argc, char *argv[]) {
  64.   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, 
  65.                               WordCountReduce,WordCountPartitioner,
  66.                               WordCountReduce>());
  67. }