NdbEventOperationImpl.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:30k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <kernel_types.h>
  15. #include "NdbDictionaryImpl.hpp"
  16. #include "API.hpp"
  17. #include <NdbOut.hpp>
  18. #include "NdbApiSignal.hpp"
  19. #include "TransporterFacade.hpp"
  20. #include <signaldata/CreateEvnt.hpp>
  21. #include <signaldata/SumaImpl.hpp>
  22. #include <SimpleProperties.hpp>
  23. #include <Bitmask.hpp>
  24. #include <AttributeHeader.hpp>
  25. #include <AttributeList.hpp>
  26. #include <NdbError.hpp>
  27. #include <BaseString.hpp>
  28. #include <UtilBuffer.hpp>
  29. #include <NdbDictionary.hpp>
  30. #include <Ndb.hpp>
  31. #include "NdbImpl.hpp"
  32. #include "DictCache.hpp"
  33. #include <portlib/NdbMem.h>
  34. #include <NdbRecAttr.hpp>
  35. #include <NdbEventOperation.hpp>
  36. #include "NdbEventOperationImpl.hpp"
  37. /*
  38.  * Class NdbEventOperationImpl
  39.  *
  40.  *
  41.  */
  42. //#define EVENT_DEBUG
  43. NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
  44.      Ndb *theNdb, 
  45.      const char* eventName, 
  46.      const int bufferLength) 
  47.   : NdbEventOperation(*this), m_ndb(theNdb),
  48.     m_state(ERROR), m_bufferL(bufferLength)
  49. {
  50.   m_eventId = 0;
  51.   theFirstRecAttrs[0] = NULL;
  52.   theCurrentRecAttrs[0] = NULL;
  53.   theFirstRecAttrs[1] = NULL;
  54.   theCurrentRecAttrs[1] = NULL;
  55.   sdata = NULL;
  56.   ptr[0].p = NULL;
  57.   ptr[1].p = NULL;
  58.   ptr[2].p = NULL;
  59.   // we should lookup id in Dictionary, TODO
  60.   // also make sure we only have one listener on each event
  61.   if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; }
  62.   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
  63.   if (!myDict) { ndbout_c("getDictionary=NULL"); return; }
  64.   const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
  65.   if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; }
  66.   m_eventImpl = &myEvnt->m_impl;
  67.   if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; }
  68.   m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
  69.   if (m_bufferHandle->m_bufferL > 0) 
  70.     m_bufferL =m_bufferHandle->m_bufferL;
  71.   else
  72.     m_bufferHandle->m_bufferL = m_bufferL;
  73.   m_state = CREATED;
  74. }
  75. NdbEventOperationImpl::~NdbEventOperationImpl()
  76. {
  77.   int i;
  78.   if (sdata) NdbMem_Free(sdata);
  79.   for (i=0 ; i<3; i++) {
  80.     if (ptr[i].p) NdbMem_Free(ptr[i].p);
  81.   }
  82.   for (i=0 ; i<2; i++) {
  83.     NdbRecAttr *p = theFirstRecAttrs[i];
  84.     while (p) {
  85.       NdbRecAttr *p_next = p->next();
  86.       m_ndb->releaseRecAttr(p);
  87.       p = p_next;
  88.     }
  89.   }
  90.   if (m_state == NdbEventOperation::EXECUTING) {
  91.     stop();
  92.     // m_bufferHandle->dropSubscribeEvent(m_bufferId);
  93.     ; // We should send stop signal here
  94.   }
  95. }
  96. NdbEventOperation::State
  97. NdbEventOperationImpl::getState()
  98. {
  99.   return m_state;
  100. }
  101. NdbRecAttr*
  102. NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
  103. {
  104.   if (m_state != NdbEventOperation::CREATED) {
  105.     ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()");
  106.     return NULL;
  107.   }
  108.   NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
  109.   if (tAttrInfo == NULL) {
  110.     ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
  111.     return NULL;
  112.   }
  113.   return NdbEventOperationImpl::getValue(tAttrInfo, aValue, n);
  114. }
  115. NdbRecAttr*
  116. NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
  117. {
  118.   // Insert Attribute Id into ATTRINFO part. 
  119.   NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n];
  120.   NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n];
  121.       
  122.   /************************************************************************
  123.    * Get a Receive Attribute object and link it into the operation object.
  124.    ************************************************************************/
  125.   NdbRecAttr *tRecAttr = m_ndb->getRecAttr();
  126.   if (tRecAttr == NULL) { 
  127.     exit(-1);
  128.     //setErrorCodeAbort(4000);
  129.     return NULL;
  130.   }
  131.   /**********************************************************************
  132.    * Now set the attribute identity and the pointer to the data in 
  133.    * the RecAttr object
  134.    * Also set attribute size, array size and attribute type
  135.    ********************************************************************/
  136.   if (tRecAttr->setup(tAttrInfo, aValue)) {
  137.     //setErrorCodeAbort(4000);
  138.     m_ndb->releaseRecAttr(tRecAttr);
  139.     exit(-1);
  140.     return NULL;
  141.   }
  142.   //theErrorLine++;
  143.   tRecAttr->setNULL();
  144.   
  145.   // We want to keep the list sorted to make data insertion easier later
  146.   if (theFirstRecAttr == NULL) {
  147.     theFirstRecAttr = tRecAttr;
  148.     theCurrentRecAttr = tRecAttr;
  149.     tRecAttr->next(NULL);
  150.   } else {
  151.     Uint32 tAttrId = tAttrInfo->m_attrId;
  152.     if (tAttrId > theCurrentRecAttr->attrId()) { // right order
  153.       theCurrentRecAttr->next(tRecAttr);
  154.       tRecAttr->next(NULL);
  155.       theCurrentRecAttr = tRecAttr;
  156.     } else if (theFirstRecAttr->next() == NULL ||    // only one in list
  157.        theFirstRecAttr->attrId() > tAttrId) {// or first 
  158.       tRecAttr->next(theFirstRecAttr);
  159.       theFirstRecAttr = tRecAttr;
  160.     } else { // at least 2 in list and not first and not last
  161.       NdbRecAttr *p = theFirstRecAttr;
  162.       NdbRecAttr *p_next = p->next();
  163.       while (tAttrId > p_next->attrId()) {
  164. p = p_next;
  165. p_next = p->next();
  166.       }
  167.       if (tAttrId == p_next->attrId()) { // Using same attribute twice
  168. tRecAttr->release(); // do I need to do this?
  169. m_ndb->releaseRecAttr(tRecAttr);
  170. exit(-1);
  171. return NULL;
  172.       }
  173.       // this is it, between p and p_next
  174.       p->next(tRecAttr);
  175.       tRecAttr->next(p_next);
  176.     }
  177.   }
  178.   return tRecAttr;
  179. }
  180. int
  181. NdbEventOperationImpl::execute()
  182. {
  183.   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
  184.   if (!myDict) {
  185.     ndbout_c("NdbEventOperation::execute(): getDictionary=NULL");
  186.     return 0;
  187.   }
  188.   if (theFirstRecAttrs[0] == NULL) { // defaults to get all
  189.     
  190.   }
  191.   NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
  192.   int hasSubscriber;
  193.   m_bufferId = 
  194.     m_bufferHandle->prepareAddSubscribeEvent(m_eventImpl->m_eventId,
  195.      hasSubscriber /* return value */);
  196.   m_eventImpl->m_bufferId = m_bufferId;
  197.   int r = -1;
  198.   if (m_bufferId >= 0) {
  199.     // now we check if there's already a subscriber
  200.     if (hasSubscriber == 0) { // only excute if there's no other subscribers 
  201.       r = myDictImpl.executeSubscribeEvent(*m_eventImpl);
  202.     } else {
  203.       r = 0;
  204.     }
  205.     if (r) {
  206.       //Error
  207.       m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
  208.       m_state = NdbEventOperation::ERROR;
  209.     } else {
  210.       m_bufferHandle->addSubscribeEvent(m_bufferId, this);
  211.       m_state = NdbEventOperation::EXECUTING;
  212.     }
  213.   } else {
  214.     //Error
  215.     m_state = NdbEventOperation::ERROR;
  216.   }
  217.   return r;
  218. }
  219. int
  220. NdbEventOperationImpl::stop()
  221. {
  222.   if (m_state != NdbEventOperation::EXECUTING)
  223.     return -1;
  224.   //  ndbout_c("NdbEventOperation::stopping()");
  225.   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
  226.   if (!myDict) {
  227.     ndbout_c("NdbEventOperation::stop(): getDictionary=NULL");
  228.     return 0;
  229.   }
  230.   NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
  231.   int hasSubscriber;
  232.   int ret = 
  233.     m_bufferHandle->prepareDropSubscribeEvent(m_bufferId,
  234.       hasSubscriber /* return value */);
  235.   if (ret < 0) {
  236.     ndbout_c("prepareDropSubscribeEvent failed");
  237.     return -1;
  238.   }
  239.   //  m_eventImpl->m_bufferId = m_bufferId;
  240.   int r = -1;
  241.   if (hasSubscriber == 0) { // only excute if there's no other subscribers
  242.     r = myDictImpl.stopSubscribeEvent(*m_eventImpl);
  243. #ifdef EVENT_DEBUG
  244.     ndbout_c("NdbEventOperation::stopping() done");
  245. #endif
  246.   } else
  247.     r = 0;
  248.   if (r) {
  249.     //Error
  250.     m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
  251.     m_state = NdbEventOperation::ERROR;
  252.   } else {
  253. #ifdef EVENT_DEBUG
  254.     ndbout_c("NdbEventOperation::dropping()");
  255. #endif
  256.     m_bufferHandle->dropSubscribeEvent(m_bufferId);
  257.     m_state = NdbEventOperation::CREATED;
  258.   }
  259.   return r;
  260. }
  261. bool
  262. NdbEventOperationImpl::isConsistent()
  263. {
  264.   return sdata->isGCIConsistent();
  265. }
  266. Uint32
  267. NdbEventOperationImpl::getGCI()
  268. {
  269.   return sdata->gci;
  270. }
  271. Uint32
  272. NdbEventOperationImpl::getLatestGCI()
  273. {
  274.   return NdbGlobalEventBufferHandle::getLatestGCI();
  275. }
  276. int
  277. NdbEventOperationImpl::next(int *pOverrun)
  278. {
  279.   int nr = 10000; // a high value
  280.   int tmpOverrun = 0;
  281.   int *ptmpOverrun;
  282.   if (pOverrun) {
  283.     ptmpOverrun = &tmpOverrun;
  284.   } else
  285.     ptmpOverrun = NULL;
  286.   while (nr > 0) {
  287.     int r=NdbGlobalEventBufferHandle::getDataL(m_bufferId, sdata,
  288.        ptr, pOverrun);
  289.     if (pOverrun) {
  290.       tmpOverrun += *pOverrun;
  291.       *pOverrun = tmpOverrun;
  292.     }
  293.     if (r <= 0) return r; // no data
  294.     if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever
  295.   
  296. #ifdef EVENT_DEBUG
  297.     ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation);
  298. #endif
  299.     // now move the data into the RecAttrs
  300.     if ((theFirstRecAttrs[0] == NULL) && 
  301. (theFirstRecAttrs[1] == NULL)) return r;
  302.     // no copying since no RecAttr's
  303.     Uint32 *aAttrPtr = ptr[0].p;
  304.     Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
  305.     Uint32 *aDataPtr = ptr[1].p;
  306. #ifdef EVENT_DEBUG
  307.     int i;
  308.     printf("after values sz=%un", ptr[1].sz);
  309.     for (i=0; i < ptr[1].sz; i++)
  310.       printf ("H'%.8X ",ptr[1].p[i]);
  311.     printf("n");
  312.     printf("before values sz=%un", ptr[2].sz);
  313.     for (i=0; i < ptr[2].sz; i++)
  314.       printf ("H'%.8X ",ptr[2].p[i]);
  315.     printf("n");
  316. #endif
  317.     NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0];
  318.     // copy data into the RecAttr's
  319.     // we assume that the respective attribute lists are sorted
  320.     Uint32 tRecAttrId;
  321.     Uint32 tAttrId;
  322.     Uint32 tDataSz;
  323.     int hasSomeData=0;
  324.     while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
  325.       tRecAttrId = tWorkingRecAttr->attrId();
  326.       tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
  327.       tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
  328.       
  329.       while (tAttrId > tRecAttrId) {
  330. //printf("[%u] %u %u [%u]n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
  331. tWorkingRecAttr->setNULL();
  332. tWorkingRecAttr = tWorkingRecAttr->next();
  333. if (tWorkingRecAttr == NULL)
  334.   break;
  335. tRecAttrId = tWorkingRecAttr->attrId();
  336.       }
  337.       if (tWorkingRecAttr == NULL)
  338. break;
  339.       
  340.       //printf("[%u] %u %u [%u]n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
  341.       
  342.       if (tAttrId == tRecAttrId) {
  343. if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
  344.   hasSomeData++;
  345. //printf("set!n");
  346. tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
  347. // move forward, data has already moved forward
  348. aAttrPtr++;
  349. aDataPtr += tDataSz;
  350. tWorkingRecAttr = tWorkingRecAttr->next();
  351.       } else {
  352. // move only attr forward
  353. aAttrPtr++;
  354. aDataPtr += tDataSz;
  355.       }
  356.     }
  357.     
  358.     while (tWorkingRecAttr != NULL) {
  359.       tRecAttrId = tWorkingRecAttr->attrId();
  360.       //printf("set undefined [%u] %u %u [%u]n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
  361.       tWorkingRecAttr->setNULL();
  362.       tWorkingRecAttr = tWorkingRecAttr->next();
  363.     }
  364.     
  365.     tWorkingRecAttr = theFirstRecAttrs[1];
  366.     aDataPtr = ptr[2].p;
  367.     Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
  368.     while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
  369.       tRecAttrId = tWorkingRecAttr->attrId();
  370.       tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
  371.       tDataSz = AttributeHeader(*aDataPtr).getDataSize();
  372.       aDataPtr++;
  373.       while (tAttrId > tRecAttrId) {
  374. tWorkingRecAttr->setNULL();
  375. tWorkingRecAttr = tWorkingRecAttr->next();
  376. if (tWorkingRecAttr == NULL)
  377.   break;
  378. tRecAttrId = tWorkingRecAttr->attrId();
  379.       }
  380.       if (tWorkingRecAttr == NULL)
  381. break;
  382.       if (tAttrId == tRecAttrId) {
  383. if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
  384.   hasSomeData++;
  385. tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
  386. aDataPtr += tDataSz;
  387. // move forward, data+attr has already moved forward
  388. tWorkingRecAttr = tWorkingRecAttr->next();
  389.       } else {
  390. // move only data+attr forward
  391. aDataPtr += tDataSz;
  392.       }
  393.     }
  394.     while (tWorkingRecAttr != NULL) {
  395.       tWorkingRecAttr->setNULL();
  396.       tWorkingRecAttr = tWorkingRecAttr->next();
  397.     }
  398.     
  399.     if (hasSomeData)
  400.       return r;
  401.   }
  402.   return 0;
  403. }
  404. NdbDictionary::Event::TableEvent 
  405. NdbEventOperationImpl::getEventType()
  406. {
  407.   switch (sdata->operation) {
  408.   case TriggerEvent::TE_INSERT:
  409.     return NdbDictionary::Event::TE_INSERT;
  410.   case TriggerEvent::TE_DELETE:
  411.     return NdbDictionary::Event::TE_DELETE;
  412.   case TriggerEvent::TE_UPDATE:
  413.     return NdbDictionary::Event::TE_UPDATE;
  414.   default:
  415.     return NdbDictionary::Event::TE_ALL;
  416.   }
  417. }
  418. void
  419. NdbEventOperationImpl::print()
  420. {
  421.   ndbout << "EventId " << m_eventId << "n";
  422.   for (int i = 0; i < 2; i++) {
  423.     NdbRecAttr *p = theFirstRecAttrs[i];
  424.     ndbout << " %u " << i;
  425.     while (p) {
  426.       ndbout << " : " << p->attrId() << " = " << *p;
  427.       p = p->next();
  428.     }
  429.     ndbout << "n";
  430.   }
  431. }
  432. void
  433. NdbEventOperationImpl::printAll()
  434. {
  435.   Uint32 *aAttrPtr = ptr[0].p;
  436.   Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
  437.   Uint32 *aDataPtr = ptr[1].p;
  438.   //tRecAttr->setup(tAttrInfo, aValue)) {
  439.   Uint32 tAttrId;
  440.   Uint32 tDataSz;
  441.   for (; aAttrPtr < aAttrEndPtr; ) {
  442.     tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
  443.     tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
  444.     aAttrPtr++;
  445.     aDataPtr += tDataSz;
  446.   }
  447. }
  448. int NdbEventOperationImpl::wait(void *p, int aMillisecondNumber)
  449. {
  450.   return ((NdbGlobalEventBufferHandle*)p)->wait(aMillisecondNumber);
  451. }
  452. /*
  453.  * Global variable ndbGlobalEventBuffer
  454.  * Class NdbGlobalEventBufferHandle
  455.  * Class NdbGlobalEventBuffer
  456.  *
  457.  */
  458. #define ADD_DROP_LOCK_GUARDR(TYPE, FN) 
  459.   ndbGlobalEventBuffer->add_drop_lock(); 
  460.   ndbGlobalEventBuffer->lock(); 
  461.   TYPE r = ndbGlobalEventBuffer->FN; 
  462.   ndbGlobalEventBuffer->unlock(); 
  463.   if (r < 0) { 
  464.     ndbGlobalEventBuffer->add_drop_unlock(); 
  465.   } 
  466.   return r;
  467. }
  468. #define GUARDR(TYPE, FN) 
  469.   ndbGlobalEventBuffer->lock(); 
  470.   TYPE r = ndbGlobalEventBuffer->FN; 
  471.   ndbGlobalEventBuffer->unlock(); 
  472.   return r;
  473. }
  474. #define GUARD(FN) 
  475.   ndbGlobalEventBuffer->lock(); 
  476.   ndbGlobalEventBuffer->FN; 
  477.   ndbGlobalEventBuffer->unlock(); 
  478. }
  479. #define ADD_DROP_UNLOCK_GUARD(FN) 
  480.   GUARD(FN); 
  481.   ndbGlobalEventBuffer->add_drop_unlock(); 
  482. }
  483. #define GUARDBLOCK(BLOCK) 
  484.   ndbGlobalEventBuffer->lock(); 
  485.   BLOCK 
  486.   ndbGlobalEventBuffer->unlock(); 
  487. }
  488. /*
  489.  * Global variable ndbGlobalEventBuffer
  490.  *
  491.  */
  492. extern NdbMutex * ndb_global_event_buffer_mutex;
  493. static NdbGlobalEventBuffer *ndbGlobalEventBuffer=NULL;
  494. /*
  495.  * Class NdbGlobalEventBufferHandle
  496.  * Each Ndb object has a Handle.  This Handle is used to access the
  497.  * global NdbGlobalEventBuffer instance ndbGlobalEventBuffer
  498.  */
  499. NdbGlobalEventBufferHandle *
  500. NdbGlobalEventBuffer_init(int n) 
  501. {
  502.   return new NdbGlobalEventBufferHandle(n);
  503.   // return NdbGlobalEventBufferHandle::init(n);
  504. }
  505. void
  506. NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *h) 
  507. {
  508.   delete h;
  509. }
  510. NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle
  511. (int MAX_NUMBER_ACTIVE_EVENTS) : m_bufferL(0), m_nids(0)
  512. {
  513.   if ((p_cond = NdbCondition_Create()) ==  NULL) {
  514.     ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed");
  515.     exit(-1);
  516.   }
  517.   
  518.   NdbMutex_Lock(ndb_global_event_buffer_mutex);
  519.   if (ndbGlobalEventBuffer == NULL) {
  520.     if (ndbGlobalEventBuffer == NULL) {
  521.       ndbGlobalEventBuffer = new NdbGlobalEventBuffer();
  522.       if (!ndbGlobalEventBuffer) {
  523. NdbMutex_Unlock(ndb_global_event_buffer_mutex);
  524. ndbout_c("NdbGlobalEventBufferHandle:: failed to allocate ndbGlobalEventBuffer");
  525. exit(-1);
  526.       }
  527.     }
  528.   }
  529.   NdbMutex_Unlock(ndb_global_event_buffer_mutex);
  530.   GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS));
  531. }
  532. NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle()
  533. {
  534.   NdbCondition_Destroy(p_cond);
  535.   ndbGlobalEventBuffer->lock();
  536.   ndbGlobalEventBuffer->real_remove(this);
  537.   ndbGlobalEventBuffer->unlock();
  538.   NdbMutex_Lock(ndb_global_event_buffer_mutex);
  539.   if (ndbGlobalEventBuffer->m_handlers.size() == 0) {
  540.     delete ndbGlobalEventBuffer;
  541.     ndbGlobalEventBuffer = NULL;
  542.   }
  543.   NdbMutex_Unlock(ndb_global_event_buffer_mutex);
  544. }
  545. void
  546. NdbGlobalEventBufferHandle::addBufferId(int bufferId)
  547. {
  548.   if (m_nids >= NDB_MAX_ACTIVE_EVENTS) {
  549.     ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting");
  550.     exit(-1);
  551.   }
  552.   m_bufferIds[m_nids] = bufferId;
  553.   m_nids++;
  554. }
  555. void
  556. NdbGlobalEventBufferHandle::dropBufferId(int bufferId)
  557. {
  558.   for (int i = 0; i < m_nids; i++)
  559.     if (m_bufferIds[i] == bufferId) {
  560.       m_nids--;
  561.       for (; i < m_nids; i++)
  562. m_bufferIds[i] = m_bufferIds[i+1];
  563.       return;
  564.     }
  565.   ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist",
  566.    bufferId);
  567.   exit(-1);
  568. }
  569. /*
  570. NdbGlobalEventBufferHandle *
  571. NdbGlobalEventBufferHandle::init (int MAX_NUMBER_ACTIVE_EVENTS)
  572. {
  573.   return new NdbGlobalEventBufferHandle();
  574. }
  575. void
  576. NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle)
  577. {
  578.   delete handle;
  579. }
  580. */
  581. int 
  582. NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(Uint32 eventId,
  583.      int& hasSubscriber)
  584. {
  585.   ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventId, hasSubscriber));
  586. }
  587. void
  588. NdbGlobalEventBufferHandle::addSubscribeEvent
  589. (int bufferId, NdbEventOperationImpl *ndbEventOperationImpl)
  590. {
  591.   ADD_DROP_UNLOCK_GUARD(real_addSubscribeEvent(bufferId, ndbEventOperationImpl));
  592. }
  593. void
  594. NdbGlobalEventBufferHandle::unprepareAddSubscribeEvent(int bufferId)
  595. {
  596.   ADD_DROP_UNLOCK_GUARD(real_unprepareAddSubscribeEvent(bufferId));
  597. }
  598. int 
  599. NdbGlobalEventBufferHandle::prepareDropSubscribeEvent(int bufferId,
  600.      int& hasSubscriber)
  601. {
  602.   ADD_DROP_LOCK_GUARDR(int,real_prepareDropSubscribeEvent(bufferId, hasSubscriber));
  603. }
  604. void
  605. NdbGlobalEventBufferHandle::unprepareDropSubscribeEvent(int bufferId)
  606. {
  607.   ADD_DROP_UNLOCK_GUARD(real_unprepareDropSubscribeEvent(bufferId));
  608. }
  609. void 
  610. NdbGlobalEventBufferHandle::dropSubscribeEvent(int bufferId)
  611. {
  612.   ADD_DROP_UNLOCK_GUARD(real_dropSubscribeEvent(bufferId));
  613. }
  614. int 
  615. NdbGlobalEventBufferHandle::insertDataL(int bufferId,
  616. const SubTableData * const sdata,
  617. LinearSectionPtr ptr[3])
  618. {
  619.   GUARDR(int,real_insertDataL(bufferId,sdata,ptr));
  620. }
  621.  
  622. void
  623. NdbGlobalEventBufferHandle::latestGCI(int bufferId, Uint32 gci)
  624. {
  625.   GUARD(real_latestGCI(bufferId,gci));
  626. }
  627.  
  628. Uint32
  629. NdbGlobalEventBufferHandle::getLatestGCI()
  630. {
  631.   GUARDR(Uint32, real_getLatestGCI());
  632. }
  633.  
  634. inline void
  635. NdbGlobalEventBufferHandle::group_lock()
  636. {
  637.   ndbGlobalEventBuffer->group_lock();
  638. }
  639. inline void
  640. NdbGlobalEventBufferHandle::group_unlock()
  641. {
  642.   ndbGlobalEventBuffer->group_unlock();
  643. }
  644. int
  645. NdbGlobalEventBufferHandle::wait(int aMillisecondNumber)
  646. {
  647.   GUARDR(int, real_wait(this, aMillisecondNumber));
  648. }
  649. int NdbGlobalEventBufferHandle::getDataL(const int bufferId,
  650.  SubTableData * &sdata,
  651.  LinearSectionPtr ptr[3],
  652.  int *pOverrun)
  653. {
  654.   GUARDR(int,real_getDataL(bufferId,sdata,ptr,pOverrun));
  655. }
  656. /*
  657.  * Class NdbGlobalEventBuffer
  658.  *
  659.  *
  660.  */
  661. void
  662. NdbGlobalEventBuffer::lock()
  663. {
  664.   if (!m_group_lock_flag)
  665.     NdbMutex_Lock(ndb_global_event_buffer_mutex);
  666. }
  667. void
  668. NdbGlobalEventBuffer::unlock()
  669. {
  670.   if (!m_group_lock_flag)
  671.     NdbMutex_Unlock(ndb_global_event_buffer_mutex);
  672. }
  673. void
  674. NdbGlobalEventBuffer::add_drop_lock()
  675. {
  676.   NdbMutex_Lock(p_add_drop_mutex);
  677. }
  678. void
  679. NdbGlobalEventBuffer::add_drop_unlock()
  680. {
  681.   NdbMutex_Unlock(p_add_drop_mutex);
  682. }
  683. inline void
  684. NdbGlobalEventBuffer::group_lock()
  685. {
  686.   lock();
  687.   m_group_lock_flag = 1;
  688. }
  689. inline void
  690. NdbGlobalEventBuffer::group_unlock()
  691. {
  692.   m_group_lock_flag = 0;
  693.   unlock();
  694. }
  695. void
  696. NdbGlobalEventBuffer::lockB(int bufferId)
  697. {
  698.   NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);
  699. }
  700. void
  701. NdbGlobalEventBuffer::unlockB(int bufferId)
  702. {
  703.   NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);
  704. }
  705. // Private methods
  706. NdbGlobalEventBuffer::NdbGlobalEventBuffer() : 
  707.   m_handlers(),
  708.   m_group_lock_flag(0),
  709.   m_latestGCI(0),
  710.   m_no(0) // must start at ZERO!
  711. {
  712.   if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
  713.     ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
  714.     exit(-1);
  715.   }
  716. }
  717. NdbGlobalEventBuffer::~NdbGlobalEventBuffer()
  718. {
  719.   NdbMutex_Destroy(p_add_drop_mutex);
  720.   // NdbMem_Deallocate(m_eventBufferIdToEventId);
  721. }
  722. void
  723. NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h, 
  724.  int MAX_NUMBER_ACTIVE_EVENTS)
  725. {
  726.   if (m_handlers.size() == 0) { // First init
  727.     m_max = MAX_NUMBER_ACTIVE_EVENTS;
  728.     m_buf = new BufItem[m_max];
  729.       // (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem));
  730.     for (int i=0; i<m_max; i++) {
  731.       m_buf[i].gId = 0;
  732.     }
  733.   }
  734.   // TODO make sure we don't hit roof
  735.   //  m_handlers[m_nhandlers] = h;
  736.   m_handlers.push_back(h);
  737.   //  ndbout_c("NdbGlobalEventBuffer::real_init(), m_handles=%u %u", m_nhandlers, h);
  738. }
  739. void
  740. NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
  741. {
  742.   //  ndbout_c("NdbGlobalEventBuffer::real_init_remove(), m_handles=%u %u", m_nhandlers, h);
  743.   for (Uint32 i=0 ; i < m_handlers.size(); i++) {
  744.     //    ndbout_c("%u %u %u", i, m_handlers[i], h);
  745.     if (m_handlers[i] == h) {
  746.       m_handlers.erase(i);
  747.       if (m_handlers.size() == 0) {
  748. // ndbout_c("last to go");
  749. delete[] m_buf;
  750. m_buf = NULL;
  751. // NdbMem_Free((char*)m_buf);
  752.       }
  753.       return;
  754.     }
  755.   }
  756.   ndbout_c("NdbGlobalEventBuffer::real_init_remove() non-existing handle");
  757.   exit(-1);
  758. }
  759. int 
  760. NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
  761. (NdbGlobalEventBufferHandle *aHandle, Uint32 eventId, int& hasSubscriber)
  762. {
  763.   int i;
  764.   int bufferId = -1;
  765.   //  add_drop_lock(); // only one thread can do add or drop at a time
  766.   // Find place where eventId already set
  767.   for (i=0; i<m_no; i++) {
  768.     if (m_buf[i].gId == eventId) {
  769.       bufferId = i;
  770.       break;
  771.     }
  772.   }
  773.   if (bufferId < 0) {
  774.     // find space for new bufferId
  775.     for (i=0; i<m_no; i++) {
  776.       if (m_buf[i].gId == 0) {
  777. bufferId = i; // we found an empty spot
  778. break;
  779.       }
  780.     }
  781.     if (bufferId < 0 &&
  782. m_no < m_max) {
  783.       // room for more so get that
  784.       bufferId=m_no;
  785.       m_buf[m_no].gId = 0;
  786.       m_no++;
  787.     } else {
  788.       ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers");
  789.       //      add_drop_unlock();
  790.       return -1;
  791.     }
  792.   }
  793.   BufItem &b = m_buf[ID(bufferId)];
  794.   if (b.gId == 0) { // first subscriber needs some initialization
  795.     bufferId = NO_ID(0, bufferId);
  796.     b.gId = eventId;
  797.     if ((b.p_buf_mutex = NdbMutex_Create()) == NULL) {
  798.       ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
  799.       exit(-1);
  800.     }
  801.     b.subs = 0;
  802.     b.f = 0;
  803.     b.sz = 0;
  804.     b.max_sz = aHandle->m_bufferL;
  805.     b.data = 
  806.       (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data));
  807.     for (int i = 0; i < b.max_sz; i++) {
  808.       b.data[i].sdata = NULL;
  809.       b.data[i].ptr[0].p = NULL;
  810.       b.data[i].ptr[1].p = NULL;
  811.       b.data[i].ptr[2].p = NULL;
  812.     }
  813.   } else {
  814. #ifdef EVENT_DEBUG
  815.     ndbout_c("NdbGlobalEventBuffer::prepareAddSubscribeEvent: TRYING handle one subscriber per event b.subs = %u", b.subs);
  816. #endif
  817.     int ni = -1;
  818.     for(int i=0; i < b.subs;i++) {
  819.       if (b.ps[i].theHandle == NULL) {
  820. ni = i;
  821. break;
  822.       }
  823.     }
  824.     if (ni < 0) {
  825.       if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) {
  826. ni = b.subs;
  827.       } else {
  828. ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers");
  829. // add_drop_unlock();
  830. return -1;
  831.       }
  832.     }
  833.     bufferId = NO_ID(ni, bufferId);
  834.   }
  835.   // initialize BufItem::Ps
  836.   {
  837.     int n = NO(bufferId);
  838.     NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
  839.     e.theHandle = aHandle;
  840.     e.b=0;
  841.     e.bufferempty = 1;
  842.     e.overrun=0; // set to -1 to handle first insert
  843.   }
  844.   if (b.subs > 0)
  845.     hasSubscriber = 1;
  846.   else
  847.     hasSubscriber = 0;
  848. #ifdef EVENT_DEBUG
  849.   ndbout_c("prepareAddSubscribeEvent: handed out bufferId %d for eventId %d",
  850.    bufferId, eventId);
  851. #endif
  852.   /* we now have a lock on the prepare so that no one can mess with this
  853.    * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent
  854.    */
  855.   return bufferId;
  856. }
  857. void
  858. NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
  859. {
  860.   BufItem &b = m_buf[ID(bufferId)];
  861.   int n = NO(bufferId);
  862.   b.ps[n].theHandle = NULL;
  863.   // remove subscribers from the end,
  864.   // we have to keep gaps since the position
  865.   // has been handed out in bufferId
  866.   for (int i = b.subs-1; i >= 0; i--)
  867.     if (b.ps[i].theHandle == NULL)
  868.       b.subs--;
  869.     else
  870.       break;
  871.   if (b.subs == 0) {
  872. #ifdef EVENT_DEBUG
  873.     ndbout_c("unprepareAddSubscribeEvent: no more subscribers left on eventId %d", b.gId);
  874. #endif
  875.     b.gId = 0;  // We don't have any subscribers, reuse BufItem
  876.     if (b.data) {
  877.       NdbMem_Free((void *)b.data);
  878.       b.data = NULL;
  879.     }
  880.     if (b.p_buf_mutex) {
  881.       NdbMutex_Destroy(b.p_buf_mutex);
  882.       b.p_buf_mutex = NULL;
  883.     }
  884.   }
  885.   //  add_drop_unlock();
  886. }
  887. void
  888. NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, 
  889.      void *ndbEventOperation)
  890. {
  891.   BufItem &b = m_buf[ID(bufferId)];
  892.   int n = NO(bufferId);
  893.   b.subs++;
  894.   b.ps[n].theHandle->addBufferId(bufferId);
  895.   //  add_drop_unlock();
  896. #ifdef EVENT_DEBUG
  897.   ndbout_c("addSubscribeEvent:: added bufferId %d", bufferId);
  898. #endif
  899. }
  900. void
  901. NdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId)
  902. {
  903.   //  add_drop_unlock(); // only one thread can do add or drop at a time
  904. }
  905. int 
  906. NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
  907.      int& hasSubscriber)
  908. {
  909.   //  add_drop_lock(); // only one thread can do add or drop at a time
  910.   BufItem &b = m_buf[ID(bufferId)];
  911.   int n = 0;
  912.   for(int i=0; i < b.subs;i++) {
  913.     if (b.ps[i].theHandle != NULL)
  914.       n++;
  915.   }
  916.   if (n > 1)
  917.     hasSubscriber = 1;
  918.   else if (n == 1)
  919.     hasSubscriber = 0;
  920.   else
  921.     return -1;
  922.   return 0;
  923. }
  924. void
  925. NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
  926. {
  927.   //  add_drop_lock(); // only one thread can do add-drop at a time
  928.   BufItem &b = m_buf[ID(bufferId)];
  929.   int n = NO(bufferId);
  930.   b.ps[n].overrun=0;
  931.   b.ps[n].bufferempty=1;
  932.   b.ps[n].b=0;
  933.   b.ps[n].theHandle->dropBufferId(bufferId);
  934.   real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();
  935. #ifdef EVENT_DEBUG
  936.   ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);
  937. #endif
  938. }
  939. void
  940. NdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci)
  941. {
  942.   if (gci > m_latestGCI)
  943.     m_latestGCI = gci;
  944.   else if ((m_latestGCI-gci) > 0xffff) // If NDB stays up :-)
  945.     m_latestGCI = gci;
  946. }
  947. Uint32
  948. NdbGlobalEventBuffer::real_getLatestGCI()
  949. {
  950.   return m_latestGCI;
  951. }
  952. int
  953. NdbGlobalEventBuffer::real_insertDataL(int bufferId, 
  954.        const SubTableData * const sdata, 
  955.        LinearSectionPtr ptr[3])
  956. {
  957.   BufItem &b = m_buf[ID(bufferId)];
  958. #ifdef EVENT_DEBUG
  959.   int n = NO(bufferId);
  960. #endif
  961.   {
  962.     if (b.subs) {
  963. #ifdef EVENT_DEBUG
  964.       ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId);
  965. #endif
  966.       // move front forward
  967.       if (copy_data_alloc(sdata, ptr,
  968.   b.data[b.f].sdata, b.data[b.f].ptr))
  969. return -1;
  970.       for (int i=0; i < b.subs; i++) {
  971. NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i];
  972. if (e.theHandle) { // active subscriber
  973.   if (b.f == e.b) { // next-to-read == written
  974.     if (e.bufferempty == 0) {
  975.       e.overrun++; // another item has been overwritten
  976.       e.b++; // move next-to-read next since old item was overwritten
  977.       if (e.b == b.max_sz) e.b = 0; // start from beginning
  978.     }
  979.   }
  980.   e.bufferempty = 0;
  981.   // signal subscriber that there's more to get
  982.   NdbCondition_Signal(e.theHandle->p_cond);
  983. }
  984.       }
  985.       b.f++; // move next-to-write
  986.       if (b.f == b.max_sz) b.f = 0; // start from beginning
  987. #ifdef EVENT_DEBUG
  988.       ndbout_c("Front= %d Back = %d overun = %d", b.f,
  989.        b.ps[n].b, b.ps[n].overrun);
  990. #endif
  991.     } else {
  992. #ifdef EVENT_DEBUG
  993.       ndbout_c("Data arrived before ready eventId", b.gId);
  994. #endif
  995.     }
  996.   }
  997.   return 0;
  998. }
  999. int NdbGlobalEventBuffer::hasData(int bufferId) {
  1000.   BufItem &b = m_buf[ID(bufferId)];
  1001.   int n = NO(bufferId);
  1002.   NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
  1003.   if(e.bufferempty)
  1004.     return 0;
  1005.   if (b.f <= e.b)
  1006.     return b.max_sz-e.b + b.f;
  1007.   else
  1008.     return b.f-e.b;
  1009. }
  1010. int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
  1011. SubTableData * &sdata,
  1012. LinearSectionPtr ptr[3],
  1013. int *pOverrun)
  1014. {
  1015.   BufItem &b = m_buf[ID(bufferId)];
  1016.   int n = NO(bufferId);
  1017.   NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
  1018.   if (pOverrun) {
  1019.     *pOverrun = e.overrun;
  1020.     e.overrun = 0; // if pOverrun is returned to user reset e.overrun
  1021.   }
  1022.   if (e.bufferempty)
  1023.     return 0; // nothing to get
  1024.   if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr,
  1025.       sdata, ptr))
  1026.     return -1;
  1027.   e.b++; if (e.b == b.max_sz) e.b = 0; // move next-to-read forward
  1028.   if (b.f == e.b) // back has cought up with front
  1029.     e.bufferempty = 1;
  1030. #ifdef EVENT_DEBUG
  1031.   ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);
  1032. #endif
  1033.   return hasData(bufferId)+1;
  1034. }
  1035. int 
  1036. NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
  1037.       LinearSectionPtr f_ptr[3],
  1038.       SubTableData * &t_sdata,
  1039.       LinearSectionPtr t_ptr[3])
  1040. {
  1041.   if (t_sdata == NULL) {
  1042.     t_sdata = (SubTableData *)NdbMem_Allocate(sizeof(SubTableData));
  1043.   }
  1044.   memcpy(t_sdata,f_sdata,sizeof(SubTableData));
  1045.   for (int i = 0; i < 3; i++) {
  1046.     LinearSectionPtr & f_p = f_ptr[i];
  1047.     LinearSectionPtr & t_p = t_ptr[i];
  1048.     if (f_p.sz > 0) {
  1049.       if (t_p.p == NULL) {
  1050. t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
  1051.       } else if (t_p.sz != f_p.sz) {
  1052. NdbMem_Free(t_p.p);
  1053. t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
  1054.       }
  1055.       memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz);
  1056.     } else if (t_p.p != NULL) {
  1057.       NdbMem_Free(t_p.p);
  1058.       t_p.p = NULL;
  1059.     }
  1060.     t_p.sz = f_p.sz;
  1061.   }
  1062.   return 0;
  1063. }
  1064. int
  1065. NdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h,
  1066. int aMillisecondNumber)
  1067. {
  1068.   // check if there are anything in any of the buffers
  1069.   int i;
  1070.   int n = 0;
  1071.   for (i = 0; i < h->m_nids; i++)
  1072.     n += hasData(h->m_bufferIds[i]);
  1073.   if (n) return n;
  1074.   int r = NdbCondition_WaitTimeout(h->p_cond, ndb_global_event_buffer_mutex,
  1075.    aMillisecondNumber);
  1076.   if (r > 0)
  1077.     return -1;
  1078.   n = 0;
  1079.   for (i = 0; i < h->m_nids; i++)
  1080.     n += hasData(h->m_bufferIds[i]);
  1081.   return n;
  1082. }
  1083. template class Vector<NdbGlobalEventBufferHandle*>;