SignalSender.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:6k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "SignalSender.hpp"
  14. #include "ConfigRetriever.hpp"
  15. #include <NdbSleep.h>
  16. #include <SignalLoggerManager.hpp>
  17. SimpleSignal::SimpleSignal(bool dealloc){
  18.   memset(this, 0, sizeof(* this));
  19.   deallocSections = dealloc;
  20. }
  21. SimpleSignal::~SimpleSignal(){
  22.   if(!deallocSections)
  23.     return;
  24.   if(ptr[0].p != 0) delete []ptr[0].p;
  25.   if(ptr[1].p != 0) delete []ptr[1].p;
  26.   if(ptr[2].p != 0) delete []ptr[2].p;
  27. }
  28. void 
  29. SimpleSignal::set(class SignalSender& ss,
  30.   Uint8  trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
  31.   
  32.   header.theTrace                = trace;
  33.   header.theReceiversBlockNumber = recBlock;
  34.   header.theVerId_signalNumber   = gsn;
  35.   header.theLength               = len;
  36.   header.theSendersBlockRef      = refToBlock(ss.getOwnRef());
  37. }
  38. void
  39. SimpleSignal::print(FILE * out){
  40.   fprintf(out, "---- Signal ----------------n");
  41.   SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
  42.   SignalLoggerManager::printSignalData(out, header, theData);
  43.   for(Uint32 i = 0; i<header.m_noOfSections; i++){
  44.     Uint32 len = ptr[i].sz;
  45.     fprintf(out, " --- Section %d size=%d ---n", i, len);
  46.     Uint32 * signalData = ptr[i].p;
  47.     while(len >= 7){
  48.       fprintf(out, 
  49.               " H'%.8x H'%.8x H'%.8x H'%.8x H'%.8x H'%.8x H'%.8xn",
  50.               signalData[0], signalData[1], signalData[2], signalData[3], 
  51.               signalData[4], signalData[5], signalData[6]);
  52.       len -= 7;
  53.       signalData += 7;
  54.     }
  55.     if(len > 0){
  56.       fprintf(out, " H'%.8x", signalData[0]);
  57.       for(Uint32 i = 1; i<len; i++)
  58.         fprintf(out, " H'%.8x", signalData[i]);
  59.       fprintf(out, "n");
  60.     }
  61.   }
  62. }
  63. SignalSender::SignalSender(const char * connectString){
  64.   m_cond = NdbCondition_Create();
  65.   theFacade = TransporterFacade::start_instance(connectString);
  66.   m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
  67.   assert(m_blockNo > 0);
  68. }
  69. SignalSender::~SignalSender(){
  70.   theFacade->close(m_blockNo);
  71.   theFacade->stop_instance();
  72.   NdbCondition_Destroy(m_cond);
  73. }
  74. Uint32
  75. SignalSender::getOwnRef() const {
  76.   return numberToRef(m_blockNo, theFacade->ownId());
  77. }
  78. bool
  79. SignalSender::connectOne(Uint32 timeOutMillis){
  80.   NDB_TICKS start = NdbTick_CurrentMillisecond();
  81.   NDB_TICKS now = start;
  82.   while(theFacade->theClusterMgr->getNoOfConnectedNodes() == 0 &&
  83. (timeOutMillis == 0 || (now - start) < timeOutMillis)){
  84.     NdbSleep_MilliSleep(100);
  85.   }
  86.   return theFacade->theClusterMgr->getNoOfConnectedNodes() > 0;
  87. }
  88. bool
  89. SignalSender::connectAll(Uint32 timeOutMillis){
  90.   NDB_TICKS start = NdbTick_CurrentMillisecond();
  91.   NDB_TICKS now = start;
  92.   while(theFacade->theClusterMgr->getNoOfConnectedNodes() < 1 &&
  93. (timeOutMillis == 0 || (now - start) < timeOutMillis)){
  94.     NdbSleep_MilliSleep(100);
  95.   }
  96.   return theFacade->theClusterMgr->getNoOfConnectedNodes() >= 1;
  97. }
  98. Uint32
  99. SignalSender::getAliveNode(){
  100.   return theFacade->get_an_alive_node();
  101. }
  102. const ClusterMgr::Node & 
  103. SignalSender::getNodeInfo(Uint16 nodeId) const {
  104.   return theFacade->theClusterMgr->getNodeInfo(nodeId);
  105. }
  106. Uint32
  107. SignalSender::getNoOfConnectedNodes() const {
  108.   return theFacade->theClusterMgr->getNoOfConnectedNodes();
  109. }
  110. SendStatus
  111. SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
  112.   return theFacade->theTransporterRegistry->prepareSend(&s->header,
  113. 1, // JBB
  114. &s->theData[0],
  115. nodeId, 
  116. &s->ptr[0]);
  117. }
  118. template<class T>
  119. SimpleSignal *
  120. SignalSender::waitFor(Uint32 timeOutMillis, T & t){
  121.   
  122.   Guard g(theFacade->theMutexPtr);
  123.   
  124.   SimpleSignal * s = t.check(m_jobBuffer);
  125.   if(s != 0){
  126.     return s;
  127.   }
  128.   
  129.   NDB_TICKS now = NdbTick_CurrentMillisecond();
  130.   NDB_TICKS stop = now + timeOutMillis;
  131.   Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
  132.   do {
  133.     NdbCondition_WaitTimeout(m_cond,
  134.      theFacade->theMutexPtr, 
  135.      wait);
  136.     
  137.     
  138.     SimpleSignal * s = t.check(m_jobBuffer);
  139.     if(s != 0){
  140.       return s;
  141.     }
  142.     
  143.     now = NdbTick_CurrentMillisecond();
  144.     wait = (timeOutMillis == 0 ? 10 : stop - now);
  145.   } while(stop > now || timeOutMillis == 0);
  146.   
  147.   return 0;
  148. class WaitForAny {
  149. public:
  150.   SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
  151.     if(m_jobBuffer.size() > 0){
  152.       SimpleSignal * s = m_jobBuffer[0];
  153.       m_jobBuffer.erase(0);
  154. return s;
  155.     }
  156.       return 0;
  157.   }
  158. };
  159.   
  160. SimpleSignal *
  161. SignalSender::waitFor(Uint32 timeOutMillis){
  162.   
  163.   WaitForAny w;
  164.   return waitFor(timeOutMillis, w);
  165. }
  166. class WaitForNode {
  167. public:
  168.   Uint32 m_nodeId;
  169.   SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
  170.     Uint32 len = m_jobBuffer.size();
  171.     for(Uint32 i = 0; i<len; i++){
  172.       if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
  173. SimpleSignal * s = m_jobBuffer[i];
  174. m_jobBuffer.erase(i);
  175. return s;
  176.       }
  177.     }
  178.     return 0;
  179.   }
  180. };
  181. SimpleSignal *
  182. SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
  183.   
  184.   WaitForNode w;
  185.   w.m_nodeId = nodeId;
  186.   return waitFor(timeOutMillis, w);
  187. }
  188. #include <NdbApiSignal.hpp>
  189. void
  190. SignalSender::execSignal(void* signalSender, 
  191.  NdbApiSignal* signal, 
  192.  class LinearSectionPtr ptr[3]){
  193.   SimpleSignal * s = new SimpleSignal(true);
  194.   s->header = * signal;
  195.   memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
  196.   for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
  197.     s->ptr[i].p = new Uint32[ptr[i].sz];
  198.     s->ptr[i].sz = ptr[i].sz;
  199.     memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
  200.   }
  201.   SignalSender * ss = (SignalSender*)signalSender;
  202.   ss->m_jobBuffer.push_back(s);
  203.   NdbCondition_Signal(ss->m_cond);
  204. }
  205.   
  206. void 
  207. SignalSender::execNodeStatus(void* signalSender, 
  208.      Uint16 NodeId, 
  209.      bool alive, 
  210.      bool nfCompleted){
  211. }
  212.