HadoopPipes.cc
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:31k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/SerialUtils.hh"
  20. #include "hadoop/StringUtils.hh"
  21. #include <map>
  22. #include <vector>
  23. #include <errno.h>
  24. #include <netinet/in.h>
  25. #include <stdint.h>
  26. #include <stdio.h>
  27. #include <stdlib.h>
  28. #include <strings.h>
  29. #include <sys/socket.h>
  30. #include <pthread.h>
  31. using std::map;
  32. using std::string;
  33. using std::vector;
  34. using namespace HadoopUtils;
  35. namespace HadoopPipes {
  36.   class JobConfImpl: public JobConf {
  37.   private:
  38.     map<string, string> values;
  39.   public:
  40.     void set(const string& key, const string& value) {
  41.       values[key] = value;
  42.     }
  43.     virtual bool hasKey(const string& key) const {
  44.       return values.find(key) != values.end();
  45.     }
  46.     virtual const string& get(const string& key) const {
  47.       map<string,string>::const_iterator itr = values.find(key);
  48.       if (itr == values.end()) {
  49.         throw Error("Key " + key + " not found in JobConf");
  50.       }
  51.       return itr->second;
  52.     }
  53.     virtual int getInt(const string& key) const {
  54.       const string& val = get(key);
  55.       return toInt(val);
  56.     }
  57.     virtual float getFloat(const string& key) const {
  58.       const string& val = get(key);
  59.       return toFloat(val);
  60.     }
  61.     virtual bool getBoolean(const string&key) const {
  62.       const string& val = get(key);
  63.       return toBool(val);
  64.     }
  65.   };
  66.   class DownwardProtocol {
  67.   public:
  68.     virtual void start(int protocol) = 0;
  69.     virtual void setJobConf(vector<string> values) = 0;
  70.     virtual void setInputTypes(string keyType, string valueType) = 0;
  71.     virtual void runMap(string inputSplit, int numReduces, bool pipedInput)= 0;
  72.     virtual void mapItem(const string& key, const string& value) = 0;
  73.     virtual void runReduce(int reduce, bool pipedOutput) = 0;
  74.     virtual void reduceKey(const string& key) = 0;
  75.     virtual void reduceValue(const string& value) = 0;
  76.     virtual void close() = 0;
  77.     virtual void abort() = 0;
  78.     virtual ~DownwardProtocol() {}
  79.   };
  80.   class UpwardProtocol {
  81.   public:
  82.     virtual void output(const string& key, const string& value) = 0;
  83.     virtual void partitionedOutput(int reduce, const string& key,
  84.                                    const string& value) = 0;
  85.     virtual void status(const string& message) = 0;
  86.     virtual void progress(float progress) = 0;
  87.     virtual void done() = 0;
  88.     virtual void registerCounter(int id, const string& group, 
  89.                                  const string& name) = 0;
  90.     virtual void 
  91.       incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
  92.     virtual ~UpwardProtocol() {}
  93.   };
  94.   class Protocol {
  95.   public:
  96.     virtual void nextEvent() = 0;
  97.     virtual UpwardProtocol* getUplink() = 0;
  98.     virtual ~Protocol() {}
  99.   };
  100.   class TextUpwardProtocol: public UpwardProtocol {
  101.   private:
  102.     FILE* stream;
  103.     static const char fieldSeparator = 't';
  104.     static const char lineSeparator = 'n';
  105.     void writeBuffer(const string& buffer) {
  106.       fprintf(stream, quoteString(buffer, "tn").c_str());
  107.     }
  108.   public:
  109.     TextUpwardProtocol(FILE* _stream): stream(_stream) {}
  110.     
  111.     virtual void output(const string& key, const string& value) {
  112.       fprintf(stream, "output%c", fieldSeparator);
  113.       writeBuffer(key);
  114.       fprintf(stream, "%c", fieldSeparator);
  115.       writeBuffer(value);
  116.       fprintf(stream, "%c", lineSeparator);
  117.     }
  118.     virtual void partitionedOutput(int reduce, const string& key,
  119.                                    const string& value) {
  120.       fprintf(stream, "parititionedOutput%c%d%c", fieldSeparator, reduce, 
  121.               fieldSeparator);
  122.       writeBuffer(key);
  123.       fprintf(stream, "%c", fieldSeparator);
  124.       writeBuffer(value);
  125.       fprintf(stream, "%c", lineSeparator);
  126.     }
  127.     virtual void status(const string& message) {
  128.       fprintf(stream, "status%c%s%c", fieldSeparator, message.c_str(), 
  129.               lineSeparator);
  130.     }
  131.     virtual void progress(float progress) {
  132.       fprintf(stream, "progress%c%f%c", fieldSeparator, progress, 
  133.               lineSeparator);
  134.     }
  135.     virtual void registerCounter(int id, const string& group, 
  136.                                  const string& name) {
  137.       fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
  138.               fieldSeparator, group.c_str(), fieldSeparator, name.c_str(), 
  139.               lineSeparator);
  140.     }
  141.     virtual void incrementCounter(const TaskContext::Counter* counter, 
  142.                                   uint64_t amount) {
  143.       fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(), 
  144.               fieldSeparator, (long)amount, lineSeparator);
  145.     }
  146.     
  147.     virtual void done() {
  148.       fprintf(stream, "done%c", lineSeparator);
  149.     }
  150.   };
  151.   class TextProtocol: public Protocol {
  152.   private:
  153.     FILE* downStream;
  154.     DownwardProtocol* handler;
  155.     UpwardProtocol* uplink;
  156.     string key;
  157.     string value;
  158.     int readUpto(string& buffer, const char* limit) {
  159.       int ch;
  160.       buffer.clear();
  161.       while ((ch = getc(downStream)) != -1) {
  162.         if (strchr(limit, ch) != NULL) {
  163.           return ch;
  164.         }
  165.         buffer += ch;
  166.       }
  167.       return -1;
  168.     }
  169.     static const char* delim;
  170.   public:
  171.     TextProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
  172.       downStream = down;
  173.       uplink = new TextUpwardProtocol(up);
  174.       handler = _handler;
  175.     }
  176.     UpwardProtocol* getUplink() {
  177.       return uplink;
  178.     }
  179.     virtual void nextEvent() {
  180.       string command;
  181.       string arg;
  182.       int sep;
  183.       sep = readUpto(command, delim);
  184.       if (command == "mapItem") {
  185.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  186.         sep = readUpto(key, delim);
  187.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  188.         sep = readUpto(value, delim);
  189.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  190.         handler->mapItem(key, value);
  191.       } else if (command == "reduceValue") {
  192.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  193.         sep = readUpto(value, delim);
  194.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  195.         handler->reduceValue(value);
  196.       } else if (command == "reduceKey") {
  197.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  198.         sep = readUpto(key, delim);
  199.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  200.         handler->reduceKey(key);
  201.       } else if (command == "start") {
  202.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  203.         sep = readUpto(arg, delim);
  204.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  205.         handler->start(toInt(arg));
  206.       } else if (command == "setJobConf") {
  207.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  208.         sep = readUpto(arg, delim);
  209.         int len = toInt(arg);
  210.         vector<string> values(len);
  211.         for(int i=0; i < len; ++i) {
  212.           HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  213.           sep = readUpto(arg, delim);
  214.           values.push_back(arg);
  215.         }
  216.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  217.         handler->setJobConf(values);
  218.       } else if (command == "setInputTypes") {
  219.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  220.         sep = readUpto(key, delim);
  221.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  222.         sep = readUpto(value, delim);
  223.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  224.         handler->setInputTypes(key, value);
  225.       } else if (command == "runMap") {
  226.         string split;
  227.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  228.         sep = readUpto(split, delim);
  229.         string reduces;
  230.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  231.         sep = readUpto(reduces, delim);
  232.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  233.         sep = readUpto(arg, delim);
  234.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  235.         handler->runMap(split, toInt(reduces), toBool(arg));
  236.       } else if (command == "runReduce") {
  237.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  238.         sep = readUpto(arg, delim);
  239.         HADOOP_ASSERT(sep == 't', "Short text protocol command " + command);
  240.         string piped;
  241.         sep = readUpto(piped, delim);
  242.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  243.         handler->runReduce(toInt(arg), toBool(piped));
  244.       } else if (command == "abort") { 
  245.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  246.         handler->abort();
  247.       } else if (command == "close") {
  248.         HADOOP_ASSERT(sep == 'n', "Long text protocol command " + command);
  249.         handler->close();
  250.       } else {
  251.         throw Error("Illegal text protocol command " + command);
  252.       }
  253.     }
  254.     ~TextProtocol() {
  255.       delete uplink;
  256.     }
  257.   };
  258.   const char* TextProtocol::delim = "tn";
  259.   enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
  260.                      MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
  261.                      CLOSE, ABORT, 
  262.                      OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
  263.                      REGISTER_COUNTER, INCREMENT_COUNTER};
  264.   class BinaryUpwardProtocol: public UpwardProtocol {
  265.   private:
  266.     FileOutStream* stream;
  267.   public:
  268.     BinaryUpwardProtocol(FILE* _stream) {
  269.       stream = new FileOutStream();
  270.       HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
  271.     }
  272.     virtual void output(const string& key, const string& value) {
  273.       serializeInt(OUTPUT, *stream);
  274.       serializeString(key, *stream);
  275.       serializeString(value, *stream);
  276.     }
  277.     virtual void partitionedOutput(int reduce, const string& key,
  278.                                    const string& value) {
  279.       serializeInt(PARTITIONED_OUTPUT, *stream);
  280.       serializeInt(reduce, *stream);
  281.       serializeString(key, *stream);
  282.       serializeString(value, *stream);
  283.     }
  284.     virtual void status(const string& message) {
  285.       serializeInt(STATUS, *stream);
  286.       serializeString(message, *stream);
  287.     }
  288.     virtual void progress(float progress) {
  289.       serializeInt(PROGRESS, *stream);
  290.       serializeFloat(progress, *stream);
  291.       stream->flush();
  292.     }
  293.     virtual void done() {
  294.       serializeInt(DONE, *stream);
  295.     }
  296.     virtual void registerCounter(int id, const string& group, 
  297.                                  const string& name) {
  298.       serializeInt(REGISTER_COUNTER, *stream);
  299.       serializeInt(id, *stream);
  300.       serializeString(group, *stream);
  301.       serializeString(name, *stream);
  302.     }
  303.     virtual void incrementCounter(const TaskContext::Counter* counter, 
  304.                                   uint64_t amount) {
  305.       serializeInt(INCREMENT_COUNTER, *stream);
  306.       serializeInt(counter->getId(), *stream);
  307.       serializeLong(amount, *stream);
  308.     }
  309.     
  310.     ~BinaryUpwardProtocol() {
  311.       delete stream;
  312.     }
  313.   };
  314.   class BinaryProtocol: public Protocol {
  315.   private:
  316.     FileInStream* downStream;
  317.     DownwardProtocol* handler;
  318.     BinaryUpwardProtocol * uplink;
  319.     string key;
  320.     string value;
  321.   public:
  322.     BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
  323.       downStream = new FileInStream();
  324.       downStream->open(down);
  325.       uplink = new BinaryUpwardProtocol(up);
  326.       handler = _handler;
  327.     }
  328.     UpwardProtocol* getUplink() {
  329.       return uplink;
  330.     }
  331.     virtual void nextEvent() {
  332.       int32_t cmd;
  333.       cmd = deserializeInt(*downStream);
  334.       switch (cmd) {
  335.       case START_MESSAGE: {
  336.         int32_t prot;
  337.         prot = deserializeInt(*downStream);
  338.         handler->start(prot);
  339.         break;
  340.       }
  341.       case SET_JOB_CONF: {
  342.         int32_t entries;
  343.         entries = deserializeInt(*downStream);
  344.         vector<string> result(entries);
  345.         for(int i=0; i < entries; ++i) {
  346.           string item;
  347.           deserializeString(item, *downStream);
  348.           result.push_back(item);
  349.         }
  350.         handler->setJobConf(result);
  351.         break;
  352.       }
  353.       case SET_INPUT_TYPES: {
  354.         string keyType;
  355.         string valueType;
  356.         deserializeString(keyType, *downStream);
  357.         deserializeString(valueType, *downStream);
  358.         handler->setInputTypes(keyType, valueType);
  359.         break;
  360.       }
  361.       case RUN_MAP: {
  362.         string split;
  363.         int32_t numReduces;
  364.         int32_t piped;
  365.         deserializeString(split, *downStream);
  366.         numReduces = deserializeInt(*downStream);
  367.         piped = deserializeInt(*downStream);
  368.         handler->runMap(split, numReduces, piped);
  369.         break;
  370.       }
  371.       case MAP_ITEM: {
  372.         deserializeString(key, *downStream);
  373.         deserializeString(value, *downStream);
  374.         handler->mapItem(key, value);
  375.         break;
  376.       }
  377.       case RUN_REDUCE: {
  378.         int32_t reduce;
  379.         int32_t piped;
  380.         reduce = deserializeInt(*downStream);
  381.         piped = deserializeInt(*downStream);
  382.         handler->runReduce(reduce, piped);
  383.         break;
  384.       }
  385.       case REDUCE_KEY: {
  386.         deserializeString(key, *downStream);
  387.         handler->reduceKey(key);
  388.         break;
  389.       }
  390.       case REDUCE_VALUE: {
  391.         deserializeString(value, *downStream);
  392.         handler->reduceValue(value);
  393.         break;
  394.       }
  395.       case CLOSE:
  396.         handler->close();
  397.         break;
  398.       case ABORT:
  399.         handler->abort();
  400.         break;
  401.       default:
  402.         HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
  403.       }
  404.     }
  405.     virtual ~BinaryProtocol() {
  406.       delete downStream;
  407.       delete uplink;
  408.     }
  409.   };
  410.   /**
  411.    * Define a context object to give to combiners that will let them
  412.    * go through the values and emit their results correctly.
  413.    */
  414.   class CombineContext: public ReduceContext {
  415.   private:
  416.     ReduceContext* baseContext;
  417.     Partitioner* partitioner;
  418.     int numReduces;
  419.     UpwardProtocol* uplink;
  420.     bool firstKey;
  421.     bool firstValue;
  422.     map<string, vector<string> >::iterator keyItr;
  423.     map<string, vector<string> >::iterator endKeyItr;
  424.     vector<string>::iterator valueItr;
  425.     vector<string>::iterator endValueItr;
  426.   public:
  427.     CombineContext(ReduceContext* _baseContext,
  428.                    Partitioner* _partitioner,
  429.                    int _numReduces,
  430.                    UpwardProtocol* _uplink,
  431.                    map<string, vector<string> >& data) {
  432.       baseContext = _baseContext;
  433.       partitioner = _partitioner;
  434.       numReduces = _numReduces;
  435.       uplink = _uplink;
  436.       keyItr = data.begin();
  437.       endKeyItr = data.end();
  438.       firstKey = true;
  439.       firstValue = true;
  440.     }
  441.     virtual const JobConf* getJobConf() {
  442.       return baseContext->getJobConf();
  443.     }
  444.     virtual const std::string& getInputKey() {
  445.       return keyItr->first;
  446.     }
  447.     virtual const std::string& getInputValue() {
  448.       return *valueItr;
  449.     }
  450.     virtual void emit(const std::string& key, const std::string& value) {
  451.       if (partitioner != NULL) {
  452.         uplink->partitionedOutput(partitioner->partition(key, numReduces),
  453.                                   key, value);
  454.       } else {
  455.         uplink->output(key, value);
  456.       }
  457.     }
  458.     virtual void progress() {
  459.       baseContext->progress();
  460.     }
  461.     virtual void setStatus(const std::string& status) {
  462.       baseContext->setStatus(status);
  463.     }
  464.     bool nextKey() {
  465.       if (firstKey) {
  466.         firstKey = false;
  467.       } else {
  468.         ++keyItr;
  469.       }
  470.       if (keyItr != endKeyItr) {
  471.         valueItr = keyItr->second.begin();
  472.         endValueItr = keyItr->second.end();
  473.         firstValue = true;
  474.         return true;
  475.       }
  476.       return false;
  477.     }
  478.     virtual bool nextValue() {
  479.       if (firstValue) {
  480.         firstValue = false;
  481.       } else {
  482.         ++valueItr;
  483.       }
  484.       return valueItr != endValueItr;
  485.     }
  486.     
  487.     virtual Counter* getCounter(const std::string& group, 
  488.                                const std::string& name) {
  489.       return baseContext->getCounter(group, name);
  490.     }
  491.     virtual void incrementCounter(const Counter* counter, uint64_t amount) {
  492.       baseContext->incrementCounter(counter, amount);
  493.     }
  494.   };
  495.   /**
  496.    * A RecordWriter that will take the map outputs, buffer them up and then
  497.    * combine then when the buffer is full.
  498.    */
  499.   class CombineRunner: public RecordWriter {
  500.   private:
  501.     map<string, vector<string> > data;
  502.     int64_t spillSize;
  503.     int64_t numBytes;
  504.     ReduceContext* baseContext;
  505.     Partitioner* partitioner;
  506.     int numReduces;
  507.     UpwardProtocol* uplink;
  508.     Reducer* combiner;
  509.   public:
  510.     CombineRunner(int64_t _spillSize, ReduceContext* _baseContext, 
  511.                   Reducer* _combiner, UpwardProtocol* _uplink, 
  512.                   Partitioner* _partitioner, int _numReduces) {
  513.       numBytes = 0;
  514.       spillSize = _spillSize;
  515.       baseContext = _baseContext;
  516.       partitioner = _partitioner;
  517.       numReduces = _numReduces;
  518.       uplink = _uplink;
  519.       combiner = _combiner;
  520.     }
  521.     virtual void emit(const std::string& key,
  522.                       const std::string& value) {
  523.       numBytes += key.length() + value.length();
  524.       data[key].push_back(value);
  525.       if (numBytes >= spillSize) {
  526.         spillAll();
  527.       }
  528.     }
  529.     virtual void close() {
  530.       spillAll();
  531.     }
  532.   private:
  533.     void spillAll() {
  534.       CombineContext context(baseContext, partitioner, numReduces, 
  535.                              uplink, data);
  536.       while (context.nextKey()) {
  537.         combiner->reduce(context);
  538.       }
  539.       data.clear();
  540.     }
  541.   };
  542.   class TaskContextImpl: public MapContext, public ReduceContext, 
  543.                          public DownwardProtocol {
  544.   private:
  545.     bool done;
  546.     JobConf* jobConf;
  547.     string key;
  548.     const string* newKey;
  549.     const string* value;
  550.     bool hasTask;
  551.     bool isNewKey;
  552.     bool isNewValue;
  553.     string* inputKeyClass;
  554.     string* inputValueClass;
  555.     string status;
  556.     float progressFloat;
  557.     uint64_t lastProgress;
  558.     bool statusSet;
  559.     Protocol* protocol;
  560.     UpwardProtocol *uplink;
  561.     string* inputSplit;
  562.     RecordReader* reader;
  563.     Mapper* mapper;
  564.     Reducer* reducer;
  565.     RecordWriter* writer;
  566.     Partitioner* partitioner;
  567.     int numReduces;
  568.     const Factory* factory;
  569.     pthread_mutex_t mutexDone;
  570.     std::vector<int> registeredCounterIds;
  571.   public:
  572.     TaskContextImpl(const Factory& _factory) {
  573.       statusSet = false;
  574.       done = false;
  575.       newKey = NULL;
  576.       factory = &_factory;
  577.       jobConf = NULL;
  578.       inputKeyClass = NULL;
  579.       inputValueClass = NULL;
  580.       inputSplit = NULL;
  581.       mapper = NULL;
  582.       reducer = NULL;
  583.       reader = NULL;
  584.       writer = NULL;
  585.       partitioner = NULL;
  586.       protocol = NULL;
  587.       isNewKey = false;
  588.       isNewValue = false;
  589.       lastProgress = 0;
  590.       progressFloat = 0.0f;
  591.       hasTask = false;
  592.       pthread_mutex_init(&mutexDone, NULL);
  593.     }
  594.     void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
  595.       protocol = _protocol;
  596.       uplink = _uplink;
  597.     }
  598.     virtual void start(int protocol) {
  599.       if (protocol != 0) {
  600.         throw Error("Protocol version " + toString(protocol) + 
  601.                     " not supported");
  602.       }
  603.     }
  604.     virtual void setJobConf(vector<string> values) {
  605.       int len = values.size();
  606.       JobConfImpl* result = new JobConfImpl();
  607.       HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
  608.       for(int i=0; i < len; i += 2) {
  609.         result->set(values[i], values[i+1]);
  610.       }
  611.       jobConf = result;
  612.     }
  613.     virtual void setInputTypes(string keyType, string valueType) {
  614.       inputKeyClass = new string(keyType);
  615.       inputValueClass = new string(valueType);
  616.     }
  617.     virtual void runMap(string _inputSplit, int _numReduces, bool pipedInput) {
  618.       inputSplit = new string(_inputSplit);
  619.       reader = factory->createRecordReader(*this);
  620.       HADOOP_ASSERT((reader == NULL) == pipedInput,
  621.                     pipedInput ? "RecordReader defined when not needed.":
  622.                     "RecordReader not defined");
  623.       if (reader != NULL) {
  624.         value = new string();
  625.       }
  626.       mapper = factory->createMapper(*this);
  627.       numReduces = _numReduces;
  628.       if (numReduces != 0) { 
  629.         reducer = factory->createCombiner(*this);
  630.         partitioner = factory->createPartitioner(*this);
  631.       }
  632.       if (reducer != NULL) {
  633.         int64_t spillSize = 100;
  634.         if (jobConf->hasKey("io.sort.mb")) {
  635.           spillSize = jobConf->getInt("io.sort.mb");
  636.         }
  637.         writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer, 
  638.                                    uplink, partitioner, numReduces);
  639.       }
  640.       hasTask = true;
  641.     }
  642.     virtual void mapItem(const string& _key, const string& _value) {
  643.       newKey = &_key;
  644.       value = &_value;
  645.       isNewKey = true;
  646.     }
  647.     virtual void runReduce(int reduce, bool pipedOutput) {
  648.       reducer = factory->createReducer(*this);
  649.       writer = factory->createRecordWriter(*this);
  650.       HADOOP_ASSERT((writer == NULL) == pipedOutput,
  651.                     pipedOutput ? "RecordWriter defined when not needed.":
  652.                     "RecordWriter not defined");
  653.       hasTask = true;
  654.     }
  655.     virtual void reduceKey(const string& _key) {
  656.       isNewKey = true;
  657.       newKey = &_key;
  658.     }
  659.     virtual void reduceValue(const string& _value) {
  660.       isNewValue = true;
  661.       value = &_value;
  662.     }
  663.     
  664.     virtual bool isDone() {
  665.       pthread_mutex_lock(&mutexDone);
  666.       bool doneCopy = done;
  667.       pthread_mutex_unlock(&mutexDone);
  668.       return doneCopy;
  669.     }
  670.     virtual void close() {
  671.       pthread_mutex_lock(&mutexDone);
  672.       done = true;
  673.       pthread_mutex_unlock(&mutexDone);
  674.     }
  675.     virtual void abort() {
  676.       throw Error("Aborted by driver");
  677.     }
  678.     void waitForTask() {
  679.       while (!done && !hasTask) {
  680.         protocol->nextEvent();
  681.       }
  682.     }
  683.     bool nextKey() {
  684.       if (reader == NULL) {
  685.         while (!isNewKey) {
  686.           nextValue();
  687.           if (done) {
  688.             return false;
  689.           }
  690.         }
  691.         key = *newKey;
  692.       } else {
  693.         if (!reader->next(key, const_cast<string&>(*value))) {
  694.           pthread_mutex_lock(&mutexDone);
  695.           done = true;
  696.           pthread_mutex_unlock(&mutexDone);
  697.           return false;
  698.         }
  699.         progressFloat = reader->getProgress();
  700.       }
  701.       isNewKey = false;
  702.       if (mapper != NULL) {
  703.         mapper->map(*this);
  704.       } else {
  705.         reducer->reduce(*this);
  706.       }
  707.       return true;
  708.     }
  709.     /**
  710.      * Advance to the next value.
  711.      */
  712.     virtual bool nextValue() {
  713.       if (isNewKey || done) {
  714.         return false;
  715.       }
  716.       isNewValue = false;
  717.       progress();
  718.       protocol->nextEvent();
  719.       return isNewValue;
  720.     }
  721.     /**
  722.      * Get the JobConf for the current task.
  723.      */
  724.     virtual JobConf* getJobConf() {
  725.       return jobConf;
  726.     }
  727.     /**
  728.      * Get the current key. 
  729.      * @return the current key or NULL if called before the first map or reduce
  730.      */
  731.     virtual const string& getInputKey() {
  732.       return key;
  733.     }
  734.     /**
  735.      * Get the current value. 
  736.      * @return the current value or NULL if called before the first map or 
  737.      *    reduce
  738.      */
  739.     virtual const string& getInputValue() {
  740.       return *value;
  741.     }
  742.     /**
  743.      * Mark your task as having made progress without changing the status 
  744.      * message.
  745.      */
  746.     virtual void progress() {
  747.       if (uplink != 0) {
  748.         uint64_t now = getCurrentMillis();
  749.         if (now - lastProgress > 1000) {
  750.           lastProgress = now;
  751.           if (statusSet) {
  752.             uplink->status(status);
  753.             statusSet = false;
  754.           }
  755.           uplink->progress(progressFloat);
  756.         }
  757.       }
  758.     }
  759.     /**
  760.      * Set the status message and call progress.
  761.      */
  762.     virtual void setStatus(const string& status) {
  763.       this->status = status;
  764.       statusSet = true;
  765.       progress();
  766.     }
  767.     /**
  768.      * Get the name of the key class of the input to this task.
  769.      */
  770.     virtual const string& getInputKeyClass() {
  771.       return *inputKeyClass;
  772.     }
  773.     /**
  774.      * Get the name of the value class of the input to this task.
  775.      */
  776.     virtual const string& getInputValueClass() {
  777.       return *inputValueClass;
  778.     }
  779.     /**
  780.      * Access the InputSplit of the mapper.
  781.      */
  782.     virtual const std::string& getInputSplit() {
  783.       return *inputSplit;
  784.     }
  785.     virtual void emit(const string& key, const string& value) {
  786.       progress();
  787.       if (writer != NULL) {
  788.         writer->emit(key, value);
  789.       } else if (partitioner != NULL) {
  790.         int part = partitioner->partition(key, numReduces);
  791.         uplink->partitionedOutput(part, key, value);
  792.       } else {
  793.         uplink->output(key, value);
  794.       }
  795.     }
  796.     /**
  797.      * Register a counter with the given group and name.
  798.      */
  799.     virtual Counter* getCounter(const std::string& group, 
  800.                                const std::string& name) {
  801.       int id = registeredCounterIds.size();
  802.       registeredCounterIds.push_back(id);
  803.       uplink->registerCounter(id, group, name);
  804.       return new Counter(id);
  805.     }
  806.     /**
  807.      * Increment the value of the counter with the given amount.
  808.      */
  809.     virtual void incrementCounter(const Counter* counter, uint64_t amount) {
  810.       uplink->incrementCounter(counter, amount); 
  811.     }
  812.     void closeAll() {
  813.       if (reader) {
  814.         reader->close();
  815.       }
  816.       if (mapper) {
  817.         mapper->close();
  818.       }
  819.       if (reducer) {
  820.         reducer->close();
  821.       }
  822.       if (writer) {
  823.         writer->close();
  824.       }
  825.     }
  826.     virtual ~TaskContextImpl() {
  827.       delete jobConf;
  828.       delete inputKeyClass;
  829.       delete inputValueClass;
  830.       delete inputSplit;
  831.       if (reader) {
  832.         delete value;
  833.       }
  834.       delete reader;
  835.       delete mapper;
  836.       delete reducer;
  837.       delete writer;
  838.       delete partitioner;
  839.       pthread_mutex_destroy(&mutexDone);
  840.     }
  841.   };
  842.   /**
  843.    * Ping the parent every 5 seconds to know if it is alive 
  844.    */
  845.   void* ping(void* ptr) {
  846.     TaskContextImpl* context = (TaskContextImpl*) ptr;
  847.     char* portStr = getenv("hadoop.pipes.command.port");
  848.     int MAX_RETRIES = 3;
  849.     int remaining_retries = MAX_RETRIES;
  850.     while (!context->isDone()) {
  851.       try{
  852.         sleep(5);
  853.         int sock = -1;
  854.         if (portStr) {
  855.           sock = socket(PF_INET, SOCK_STREAM, 0);
  856.           HADOOP_ASSERT(sock != - 1,
  857.                         string("problem creating socket: ") + strerror(errno));
  858.           sockaddr_in addr;
  859.           addr.sin_family = AF_INET;
  860.           addr.sin_port = htons(toInt(portStr));
  861.           addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  862.           HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
  863.                         string("problem connecting command socket: ") +
  864.                         strerror(errno));
  865.         }
  866.         if (sock != -1) {
  867.           int result = shutdown(sock, SHUT_RDWR);
  868.           HADOOP_ASSERT(result == 0, "problem shutting socket");
  869.           result = close(sock);
  870.           HADOOP_ASSERT(result == 0, "problem closing socket");
  871.         }
  872.         remaining_retries = MAX_RETRIES;
  873.       } catch (Error& err) {
  874.         if (!context->isDone()) {
  875.           fprintf(stderr, "Hadoop Pipes Exception: in ping %sn", 
  876.                 err.getMessage().c_str());
  877.           remaining_retries -= 1;
  878.           if (remaining_retries == 0) {
  879.             exit(1);
  880.           }
  881.         } else {
  882.           return NULL;
  883.         }
  884.       }
  885.     }
  886.     return NULL;
  887.   }
  888.   /**
  889.    * Run the assigned task in the framework.
  890.    * The user's main function should set the various functions using the 
  891.    * set* functions above and then call this.
  892.    * @return true, if the task succeeded.
  893.    */
  894.   bool runTask(const Factory& factory) {
  895.     try {
  896.       TaskContextImpl* context = new TaskContextImpl(factory);
  897.       Protocol* connection;
  898.       char* portStr = getenv("hadoop.pipes.command.port");
  899.       int sock = -1;
  900.       FILE* stream = NULL;
  901.       FILE* outStream = NULL;
  902.       char *bufin = NULL;
  903.       char *bufout = NULL;
  904.       if (portStr) {
  905.         sock = socket(PF_INET, SOCK_STREAM, 0);
  906.         HADOOP_ASSERT(sock != - 1,
  907.                       string("problem creating socket: ") + strerror(errno));
  908.         sockaddr_in addr;
  909.         addr.sin_family = AF_INET;
  910.         addr.sin_port = htons(toInt(portStr));
  911.         addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  912.         HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
  913.                       string("problem connecting command socket: ") +
  914.                       strerror(errno));
  915.         stream = fdopen(sock, "r");
  916.         outStream = fdopen(sock, "w");
  917.         // increase buffer size
  918.         int bufsize = 128*1024;
  919.         int setbuf;
  920.         bufin = new char[bufsize];
  921.         bufout = new char[bufsize];
  922.         setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
  923.         HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
  924.                                      + strerror(errno));
  925.         setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
  926.         HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
  927.                                      + strerror(errno));
  928.         connection = new BinaryProtocol(stream, context, outStream);
  929.       } else if (getenv("hadoop.pipes.command.file")) {
  930.         char* filename = getenv("hadoop.pipes.command.file");
  931.         string outFilename = filename;
  932.         outFilename += ".out";
  933.         stream = fopen(filename, "r");
  934.         outStream = fopen(outFilename.c_str(), "w");
  935.         connection = new BinaryProtocol(stream, context, outStream);
  936.       } else {
  937.         connection = new TextProtocol(stdin, context, stdout);
  938.       }
  939.       context->setProtocol(connection, connection->getUplink());
  940.       pthread_t pingThread;
  941.       pthread_create(&pingThread, NULL, ping, (void*)(context));
  942.       context->waitForTask();
  943.       while (!context->isDone()) {
  944.         context->nextKey();
  945.       }
  946.       context->closeAll();
  947.       connection->getUplink()->done();
  948.       pthread_join(pingThread,NULL);
  949.       delete context;
  950.       delete connection;
  951.       if (stream != NULL) {
  952.         fflush(stream);
  953.       }
  954.       if (outStream != NULL) {
  955.         fflush(outStream);
  956.       }
  957.       fflush(stdout);
  958.       if (sock != -1) {
  959.         int result = shutdown(sock, SHUT_RDWR);
  960.         HADOOP_ASSERT(result == 0, "problem shutting socket");
  961.         result = close(sock);
  962.         HADOOP_ASSERT(result == 0, "problem closing socket");
  963.       }
  964.       if (stream != NULL) {
  965.         //fclose(stream);
  966.       }
  967.       if (outStream != NULL) {
  968.         //fclose(outStream);
  969.       } 
  970.       delete bufin;
  971.       delete bufout;
  972.       return true;
  973.     } catch (Error& err) {
  974.       fprintf(stderr, "Hadoop Pipes Exception: %sn", 
  975.               err.getMessage().c_str());
  976.       return false;
  977.     }
  978.   }
  979. }