wordcount-nopipe.cc
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/TemplateFactory.hh"
  20. #include "hadoop/StringUtils.hh"
  21. #include "hadoop/SerialUtils.hh"
  22. #include <stdio.h>
  23. #include <sys/types.h>
  24. #include <sys/stat.h>
  25. const std::string WORDCOUNT = "WORDCOUNT";
  26. const std::string INPUT_WORDS = "INPUT_WORDS";
  27. const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
  28. class WordCountMap: public HadoopPipes::Mapper {
  29. public:
  30.   HadoopPipes::TaskContext::Counter* inputWords;
  31.   
  32.   WordCountMap(HadoopPipes::TaskContext& context) {
  33.     inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
  34.   }
  35.   
  36.   void map(HadoopPipes::MapContext& context) {
  37.     std::vector<std::string> words = 
  38.       HadoopUtils::splitString(context.getInputValue(), " ");
  39.     for(unsigned int i=0; i < words.size(); ++i) {
  40.       context.emit(words[i], "1");
  41.     }
  42.     context.incrementCounter(inputWords, words.size());
  43.   }
  44. };
  45. class WordCountReduce: public HadoopPipes::Reducer {
  46. public:
  47.   HadoopPipes::TaskContext::Counter* outputWords;
  48.   WordCountReduce(HadoopPipes::TaskContext& context) {
  49.     outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
  50.   }
  51.   void reduce(HadoopPipes::ReduceContext& context) {
  52.     int sum = 0;
  53.     while (context.nextValue()) {
  54.       sum += HadoopUtils::toInt(context.getInputValue());
  55.     }
  56.     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
  57.     context.incrementCounter(outputWords, 1); 
  58.   }
  59. };
  60. class WordCountReader: public HadoopPipes::RecordReader {
  61. private:
  62.   int64_t bytesTotal;
  63.   int64_t bytesRead;
  64.   FILE* file;
  65. public:
  66.   WordCountReader(HadoopPipes::MapContext& context) {
  67.     std::string filename;
  68.     HadoopUtils::StringInStream stream(context.getInputSplit());
  69.     HadoopUtils::deserializeString(filename, stream);
  70.     struct stat statResult;
  71.     stat(filename.c_str(), &statResult);
  72.     bytesTotal = statResult.st_size;
  73.     bytesRead = 0;
  74.     file = fopen(filename.c_str(), "rt");
  75.     HADOOP_ASSERT(file != NULL, "failed to open " + filename);
  76.   }
  77.   ~WordCountReader() {
  78.     fclose(file);
  79.   }
  80.   virtual bool next(std::string& key, std::string& value) {
  81.     key = HadoopUtils::toString(ftell(file));
  82.     int ch = getc(file);
  83.     bytesRead += 1;
  84.     value.clear();
  85.     while (ch != -1 && ch != 'n') {
  86.       value += ch;
  87.       ch = getc(file);
  88.       bytesRead += 1;
  89.     }
  90.     return ch != -1;
  91.   }
  92.   /**
  93.    * The progress of the record reader through the split as a value between
  94.    * 0.0 and 1.0.
  95.    */
  96.   virtual float getProgress() {
  97.     if (bytesTotal > 0) {
  98.       return (float)bytesRead / bytesTotal;
  99.     } else {
  100.       return 1.0f;
  101.     }
  102.   }
  103. };
  104. class WordCountWriter: public HadoopPipes::RecordWriter {
  105. private:
  106.   FILE* file;
  107. public:
  108.   WordCountWriter(HadoopPipes::ReduceContext& context) {
  109.     const HadoopPipes::JobConf* job = context.getJobConf();
  110.     int part = job->getInt("mapred.task.partition");
  111.     std::string outDir = job->get("mapred.work.output.dir");
  112.     // remove the file: schema substring
  113.     std::string::size_type posn = outDir.find(":");
  114.     HADOOP_ASSERT(posn != std::string::npos, 
  115.                   "no schema found in output dir: " + outDir);
  116.     outDir.erase(0, posn+1);
  117.     mkdir(outDir.c_str(), 0777);
  118.     std::string outFile = outDir + "/part-" + HadoopUtils::toString(part);
  119.     file = fopen(outFile.c_str(), "wt");
  120.     HADOOP_ASSERT(file != NULL, "can't open file for writing: " + outFile);
  121.   }
  122.   ~WordCountWriter() {
  123.     fclose(file);
  124.   }
  125.   void emit(const std::string& key, const std::string& value) {
  126.     fprintf(file, "%s -> %sn", key.c_str(), value.c_str());
  127.   }
  128. };
  129. int main(int argc, char *argv[]) {
  130.   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, 
  131.                               WordCountReduce, void, void, WordCountReader,
  132.                               WordCountWriter>());
  133. }