OSE_Receiver.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:3k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #ifndef OSE_RECEIVER_HPP
  14. #define OSE_RECEIVER_HPP
  15. #include "ose.h"
  16. #include "OSE_Signals.hpp"
  17. #include <kernel_types.h>
  18. class OSE_Receiver {
  19. public:
  20.   OSE_Receiver(class TransporterRegistry *,
  21.        int recBufSize,
  22.        NodeId localNodeId);
  23.   ~OSE_Receiver();
  24.   bool hasData() const ;
  25.   bool isFull() const ;
  26.   
  27.   Uint32 getReceiveData(NodeId * remoteNodeId,
  28. Uint32 ** readPtr);
  29.   
  30.   void updateReceiveDataPtr(Uint32 szRead);
  31.   
  32.   bool doReceive(Uint32 timeOutMillis);
  33.   PROCESS createPhantom();
  34.   void destroyPhantom();
  35.   
  36. private:
  37.   class TransporterRegistry * theTransporterRegistry;
  38.   
  39.   NodeId localNodeId;
  40.   char localHostName[255];
  41.   
  42.   bool phantomCreated;
  43.   PROCESS phantomPid;
  44.   struct OS_redir_entry redir;
  45.   
  46.   int recBufReadIndex;
  47.   int recBufWriteIndex;
  48.   int recBufSize;
  49.   union SIGNAL **receiveBuffer;
  50.   // Stack for signals that are received out of order
  51.   int waitStackCount;
  52.   int waitStackSize;
  53.   union SIGNAL** waitStack;
  54.   // Counters for the next signal id
  55.   Uint32* nextSigId;
  56.   class OSE_Transporter * getTransporter(NodeId nodeId);
  57.   void insertReceiveBuffer(union SIGNAL * _sig);
  58.   void clearRecvBuffer(NodeId _nodeId);
  59.   bool checkWaitStack(NodeId _nodeId);
  60.   void clearWaitStack(NodeId _nodeId);
  61.   void insertWaitStack(union SIGNAL* _sig);
  62. };
  63. inline
  64. bool
  65. OSE_Receiver::hasData () const {
  66.   return recBufReadIndex != recBufWriteIndex;
  67. }
  68. inline
  69. bool
  70. OSE_Receiver::isFull () const {
  71.   return ((recBufWriteIndex + 1) % recBufSize) == recBufWriteIndex;
  72. }
  73. inline
  74. Uint32
  75. OSE_Receiver::getReceiveData(NodeId * remoteNodeId,
  76.      Uint32 ** readPtr){
  77.   NdbTransporterData *s = (NdbTransporterData *)receiveBuffer[recBufReadIndex];
  78.   if(recBufReadIndex != recBufWriteIndex){
  79.     * remoteNodeId = s->senderNodeId;
  80.     * readPtr      = &s->data[0];
  81.     return s->length;
  82.   }
  83.   return 0;
  84. }
  85. inline
  86. void
  87. OSE_Receiver::updateReceiveDataPtr(Uint32 bytesRead){
  88.   if(bytesRead != 0){
  89.     free_buf(&receiveBuffer[recBufReadIndex]);
  90.     recBufReadIndex = (recBufReadIndex + 1) % recBufSize;
  91.   }
  92. }
  93. inline 
  94. void
  95. OSE_Receiver::insertReceiveBuffer(union SIGNAL * _sig){
  96.   receiveBuffer[recBufWriteIndex] = _sig;
  97.   recBufWriteIndex = (recBufWriteIndex + 1) % recBufSize;
  98. }
  99. #endif