qtconcurrentreducekernel.h
上传用户:detong
上传日期:2022-06-22
资源大小:20675k
文件大小:7k
源码类别:

系统编程

开发平台:

Unix_Linux

  1. /****************************************************************************
  2. **
  3. ** Copyright (C) 2008 Nokia Corporation and/or its subsidiary(-ies).
  4. ** Contact: Qt Software Information (qt-info@nokia.com)
  5. **
  6. ** This file is part of the QtCore module of the Qt Toolkit.
  7. **
  8. ** Commercial Usage
  9. ** Licensees holding valid Qt Commercial licenses may use this file in
  10. ** accordance with the Qt Commercial License Agreement provided with the
  11. ** Software or, alternatively, in accordance with the terms contained in
  12. ** a written agreement between you and Nokia.
  13. **
  14. **
  15. ** GNU General Public License Usage
  16. ** Alternatively, this file may be used under the terms of the GNU
  17. ** General Public License versions 2.0 or 3.0 as published by the Free
  18. ** Software Foundation and appearing in the file LICENSE.GPL included in
  19. ** the packaging of this file.  Please review the following information
  20. ** to ensure GNU General Public Licensing requirements will be met:
  21. ** http://www.fsf.org/licensing/licenses/info/GPLv2.html and
  22. ** http://www.gnu.org/copyleft/gpl.html.  In addition, as a special
  23. ** exception, Nokia gives you certain additional rights. These rights
  24. ** are described in the Nokia Qt GPL Exception version 1.3, included in
  25. ** the file GPL_EXCEPTION.txt in this package.
  26. **
  27. ** Qt for Windows(R) Licensees
  28. ** As a special exception, Nokia, as the sole copyright holder for Qt
  29. ** Designer, grants users of the Qt/Eclipse Integration plug-in the
  30. ** right for the Qt/Eclipse Integration to link to functionality
  31. ** provided by Qt Designer and its related libraries.
  32. **
  33. ** If you are unsure which license is appropriate for your use, please
  34. ** contact the sales department at qt-sales@nokia.com.
  35. **
  36. ****************************************************************************/
  37. #ifndef QTCONCURRENT_REDUCEKERNEL_H
  38. #define QTCONCURRENT_REDUCEKERNEL_H
  39. #include <QtCore/qglobal.h>
  40. #ifndef QT_NO_CONCURRENT
  41. #include <QtCore/qatomic.h>
  42. #include <QtCore/qlist.h>
  43. #include <QtCore/qmap.h>
  44. #include <QtCore/qmutex.h>
  45. #include <QtCore/qthread.h>
  46. #include <QtCore/qvector.h>
  47. QT_BEGIN_HEADER
  48. QT_BEGIN_NAMESPACE
  49. QT_MODULE(Core)
  50. namespace QtConcurrent {
  51. #ifndef qdoc
  52. /*
  53.     The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
  54.     limit the reduce queue size for MapReduce. When the number of
  55.     reduce blocks in the queue exceeds ReduceQueueStartLimit,
  56.     MapReduce won't start any new threads, and when it exceeds
  57.     ReduceQueueThrottleLimit running threads will be stopped.
  58. */
  59. enum {
  60.     ReduceQueueStartLimit = 20,
  61.     ReduceQueueThrottleLimit = 30
  62. };
  63. // IntermediateResults holds a block of intermediate results from a
  64. // map or filter functor. The begin/end offsets indicates the origin
  65. // and range of the block.
  66. template <typename T>
  67. class IntermediateResults
  68. {
  69. public:
  70.     int begin, end;
  71.     QVector<T> vector;
  72. };
  73. #endif // qdoc
  74. enum ReduceOption {
  75.     UnorderedReduce = 0x1,
  76.     OrderedReduce = 0x2,
  77.     SequentialReduce = 0x4
  78.     // ParallelReduce = 0x8
  79. };
  80. Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
  81. Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
  82. #ifndef qdoc
  83. // supports both ordered and out-of-order reduction
  84. template <typename ReduceFunctor, typename ReduceResultType, typename T>
  85. class ReduceKernel
  86. {
  87.     typedef QMap<int, IntermediateResults<T> > ResultsMap;
  88.     const ReduceOptions reduceOptions;
  89.     QMutex mutex;
  90.     int progress, resultsMapSize;
  91.     ResultsMap resultsMap;
  92.     bool canReduce(int begin) const
  93.     {
  94.         return (((reduceOptions & UnorderedReduce)
  95.                  && progress == 0)
  96.                 || ((reduceOptions & OrderedReduce)
  97.                     && progress == begin));
  98.     }
  99.     void reduceResult(ReduceFunctor &reduce,
  100.                       ReduceResultType &r,
  101.                       const IntermediateResults<T> &result)
  102.     {
  103.         for (int i = 0; i < result.vector.size(); ++i) {
  104.             reduce(r, result.vector.at(i));
  105.         }
  106.     }
  107.     void reduceResults(ReduceFunctor &reduce,
  108.                        ReduceResultType &r,
  109.                        ResultsMap &map)
  110.     {
  111.         typename ResultsMap::iterator it = map.begin();
  112.         while (it != map.end()) {
  113.             reduceResult(reduce, r, it.value());
  114.             ++it;
  115.         }
  116.     }
  117. public:
  118.     ReduceKernel(ReduceOptions _reduceOptions)
  119.         : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0)
  120.     { }
  121.     void runReduce(ReduceFunctor &reduce,
  122.                    ReduceResultType &r,
  123.                    const IntermediateResults<T> &result)
  124.     {
  125.         QMutexLocker locker(&mutex);
  126.         if (!canReduce(result.begin)) {
  127.             ++resultsMapSize;
  128.             resultsMap.insert(result.begin, result);
  129.             return;
  130.         }
  131.         if (reduceOptions & UnorderedReduce) {
  132.             // UnorderedReduce
  133.             progress = -1;
  134.             // reduce this result
  135.             locker.unlock();
  136.             reduceResult(reduce, r, result);
  137.             locker.relock();
  138.             // reduce all stored results as well
  139.             while (!resultsMap.isEmpty()) {
  140.                 ResultsMap resultsMapCopy = resultsMap;
  141.                 resultsMap.clear();
  142.                 locker.unlock();
  143.                 reduceResults(reduce, r, resultsMapCopy);
  144.                 locker.relock();
  145.                 resultsMapSize -= resultsMapCopy.size();
  146.             }
  147.             progress = 0;
  148.         } else {
  149.             // reduce this result
  150.             locker.unlock();
  151.             reduceResult(reduce, r, result);
  152.             locker.relock();
  153.             // OrderedReduce
  154.             progress += result.end - result.begin;
  155.             // reduce as many other results as possible
  156.             typename ResultsMap::iterator it = resultsMap.begin();
  157.             while (it != resultsMap.end()) {
  158.                 if (it.value().begin != progress)
  159.                     break;
  160.                 locker.unlock();
  161.                 reduceResult(reduce, r, it.value());
  162.                 locker.relock();
  163.                 --resultsMapSize;
  164.                 progress += it.value().end - it.value().begin;
  165.                 it = resultsMap.erase(it);
  166.             }
  167.         }
  168.     }
  169.     // final reduction
  170.     void finish(ReduceFunctor &reduce, ReduceResultType &r)
  171.     {
  172.         reduceResults(reduce, r, resultsMap);
  173.     }
  174.     inline bool shouldThrottle()
  175.     {
  176.         return (resultsMapSize > (ReduceQueueThrottleLimit * QThread::idealThreadCount()));
  177.     }
  178.     inline bool shouldStartThread()
  179.     {
  180.         return (resultsMapSize <= (ReduceQueueStartLimit * QThread::idealThreadCount()));
  181.     }
  182. };
  183. template <typename Sequence, typename Base, typename Functor1, typename Functor2>
  184. struct SequenceHolder2 : public Base
  185. {
  186.     SequenceHolder2(const Sequence &_sequence,
  187.                     Functor1 functor1,
  188.                     Functor2 functor2,
  189.                     ReduceOptions reduceOptions)
  190.         : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
  191.           sequence(_sequence)
  192.     { }
  193.     Sequence sequence;
  194.     void finish()
  195.     {
  196.         Base::finish();
  197.         // Clear the sequence to make sure all temporaries are destroyed
  198.         // before finished is signaled.
  199.         sequence = Sequence();
  200.     }
  201. };
  202. #endif //qdoc
  203. } // namespace QtConcurrent
  204. QT_END_NAMESPACE
  205. QT_END_HEADER
  206. #endif // QT_NO_CONCURRENT
  207. #endif