sort.cc
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/TemplateFactory.hh"
  20. class SortMap: public HadoopPipes::Mapper {
  21. private:
  22.   /* the fraction 0.0 to 1.0 of records to keep */
  23.   float keepFraction;
  24.   /* the number of records kept so far */
  25.   long long keptRecords;
  26.   /* the total number of records */
  27.   long long totalRecords;
  28.   static const std::string MAP_KEEP_PERCENT;
  29. public:
  30.   /*
  31.    * Look in the config to find the fraction of records to keep.
  32.    */
  33.   SortMap(HadoopPipes::TaskContext& context){
  34.     const HadoopPipes::JobConf* conf = context.getJobConf();
  35.     if (conf->hasKey(MAP_KEEP_PERCENT)) {
  36.       keepFraction = conf->getFloat(MAP_KEEP_PERCENT) / 100.0;
  37.     } else {
  38.       keepFraction = 1.0;
  39.     }
  40.     keptRecords = 0;
  41.     totalRecords = 0;
  42.   }
  43.   void map(HadoopPipes::MapContext& context) {
  44.     totalRecords += 1;
  45.     while ((float) keptRecords / totalRecords < keepFraction) {
  46.       keptRecords += 1;
  47.       context.emit(context.getInputKey(), context.getInputValue());
  48.     }
  49.   }
  50. };
  51. const std::string SortMap::MAP_KEEP_PERCENT("hadoop.sort.map.keep.percent");
  52. class SortReduce: public HadoopPipes::Reducer {
  53. private:
  54.   /* the fraction 0.0 to 1.0 of records to keep */
  55.   float keepFraction;
  56.   /* the number of records kept so far */
  57.   long long keptRecords;
  58.   /* the total number of records */
  59.   long long totalRecords;
  60.   static const std::string REDUCE_KEEP_PERCENT;
  61. public:
  62.   SortReduce(HadoopPipes::TaskContext& context){
  63.     const HadoopPipes::JobConf* conf = context.getJobConf();
  64.     if (conf->hasKey(REDUCE_KEEP_PERCENT)) {
  65.       keepFraction = conf->getFloat(REDUCE_KEEP_PERCENT) / 100.0;
  66.     } else {
  67.       keepFraction = 1.0;
  68.     }
  69.     keptRecords = 0;
  70.     totalRecords = 0;
  71.   }
  72.   void reduce(HadoopPipes::ReduceContext& context) {
  73.     while (context.nextValue()) {
  74.       totalRecords += 1;
  75.       while ((float) keptRecords / totalRecords < keepFraction) {
  76.         keptRecords += 1;
  77.         context.emit(context.getInputKey(), context.getInputValue());
  78.       }
  79.     }
  80.   }
  81. };
  82. const std::string 
  83.   SortReduce::REDUCE_KEEP_PERCENT("hadoop.sort.reduce.keep.percent");
  84. int main(int argc, char *argv[]) {
  85.   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<SortMap,
  86.                                                            SortReduce>());
  87. }