wordcount-part.cc
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:2k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "hadoop/Pipes.hh"
- #include "hadoop/TemplateFactory.hh"
- #include "hadoop/StringUtils.hh"
- const std::string WORDCOUNT = "WORDCOUNT";
- const std::string INPUT_WORDS = "INPUT_WORDS";
- const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
- class WordCountMap: public HadoopPipes::Mapper {
- public:
- HadoopPipes::TaskContext::Counter* inputWords;
-
- WordCountMap(HadoopPipes::TaskContext& context) {
- inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
- }
-
- void map(HadoopPipes::MapContext& context) {
- std::vector<std::string> words =
- HadoopUtils::splitString(context.getInputValue(), " ");
- for(unsigned int i=0; i < words.size(); ++i) {
- context.emit(words[i], "1");
- }
- context.incrementCounter(inputWords, words.size());
- }
- };
- class WordCountReduce: public HadoopPipes::Reducer {
- public:
- HadoopPipes::TaskContext::Counter* outputWords;
- WordCountReduce(HadoopPipes::TaskContext& context) {
- outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
- }
- void reduce(HadoopPipes::ReduceContext& context) {
- int sum = 0;
- while (context.nextValue()) {
- sum += HadoopUtils::toInt(context.getInputValue());
- }
- context.emit(context.getInputKey(), HadoopUtils::toString(sum));
- context.incrementCounter(outputWords, 1);
- }
- };
- class WordCountPartitioner: public HadoopPipes::Partitioner {
- public:
- WordCountPartitioner(HadoopPipes::TaskContext& context){}
- virtual int partition(const std::string& key, int numOfReduces) {
- return 0;
- }
- };
- int main(int argc, char *argv[]) {
- return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
- WordCountReduce,WordCountPartitioner,
- WordCountReduce>());
- }