Suma.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:102k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "Suma.hpp"
  14. #include <ndb_version.h>
  15. #include <NdbTCP.h>
  16. #include <Bitmask.hpp>
  17. #include <SimpleProperties.hpp>
  18. #include <signaldata/NodeFailRep.hpp>
  19. #include <signaldata/ReadNodesConf.hpp>
  20. #include <signaldata/ListTables.hpp>
  21. #include <signaldata/GetTabInfo.hpp>
  22. #include <signaldata/GetTableId.hpp>
  23. #include <signaldata/DictTabInfo.hpp>
  24. #include <signaldata/SumaImpl.hpp>
  25. #include <signaldata/ScanFrag.hpp>
  26. #include <signaldata/TransIdAI.hpp>
  27. #include <signaldata/CreateTrig.hpp>
  28. #include <signaldata/AlterTrig.hpp>
  29. #include <signaldata/DropTrig.hpp>
  30. #include <signaldata/FireTrigOrd.hpp>
  31. #include <signaldata/TrigAttrInfo.hpp>
  32. #include <signaldata/CheckNodeGroups.hpp>
  33. #include <signaldata/GCPSave.hpp>
  34. #include <GrepError.hpp>
  35. #include <DebuggerNames.hpp>
  36. //#define HANDOVER_DEBUG
  37. //#define NODEFAIL_DEBUG
  38. //#define NODEFAIL_DEBUG2
  39. //#define DEBUG_SUMA_SEQUENCE
  40. //#define EVENT_DEBUG
  41. //#define EVENT_PH3_DEBUG
  42. //#define EVENT_DEBUG2
  43. /**
  44.  * @todo:
  45.  * SUMA crashes if an index is created at the same time as
  46.  * global replication. Very easy to reproduce using testIndex.
  47.  * Note: This only happens occasionally, but is quite easy to reprod.
  48.  */
  49. Uint32 g_subPtrI = RNIL;
  50. static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
  51. /**************************************************************
  52.  *
  53.  * Start of suma
  54.  *
  55.  */
  56. #define PRINT_ONLY 0
  57. static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
  58. void
  59. Suma::getNodeGroupMembers(Signal* signal) {
  60.   jam();
  61.   /**
  62.    * Ask DIH for nodeGroupMembers
  63.    */
  64.   CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
  65.   sd->blockRef = reference();
  66.   sd->requestType =
  67.     CheckNodeGroups::Direct |
  68.     CheckNodeGroups::GetNodeGroupMembers;
  69.   sd->nodeId = getOwnNodeId();
  70.   EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, 
  71.  CheckNodeGroups::SignalLength);
  72.   jamEntry();
  73.   
  74.   c_nodeGroup = sd->output;
  75.   c_noNodesInGroup = 0;
  76.   for (int i = 0; i < MAX_NDB_NODES; i++) {
  77.     if (sd->mask.get(i)) {
  78.       if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup;
  79.       c_nodesInGroup[c_noNodesInGroup] = i;
  80.       c_noNodesInGroup++;
  81.     }
  82.   }
  83.   //  ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup);
  84.   ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
  85. #ifdef NODEFAIL_DEBUG
  86.   for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  87.     ndbout_c ("Suma: NodeGroup %u, me %u, me in group %u, member[%u] %u",
  88.       c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
  89.       i, c_nodesInGroup[i]);
  90.   }
  91. #endif
  92. }
  93. void
  94. Suma::execSTTOR(Signal* signal) {
  95.   jamEntry();                            
  96.   
  97.   const Uint32 startphase  = signal->theData[1];
  98.   const Uint32 typeOfStart = signal->theData[7];
  99. #ifdef NODEFAIL_DEBUG
  100.   ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u",
  101.     startphase, typeOfStart);
  102. #endif
  103.   if(startphase == 1){
  104.     jam();
  105.     c_restartLock = true;
  106.   }
  107.   if(startphase == 3){
  108.     jam();
  109.     g_TypeOfStart = typeOfStart;
  110.     signal->theData[0] = reference();
  111.     sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
  112. #if 0
  113.     /**
  114.      * Debug
  115.      */
  116.     
  117.     SubscriptionPtr subPtr;
  118.     Ptr<SyncRecord> syncPtr;
  119.     ndbrequire(c_subscriptions.seize(subPtr));
  120.     ndbrequire(c_syncPool.seize(syncPtr));
  121.     
  122.     ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i);
  123.     subPtr.p->m_syncPtrI = syncPtr.i;
  124.     subPtr.p->m_subscriptionType = SubCreateReq::DatabaseSnapshot;
  125.     syncPtr.p->m_subscriptionPtrI = subPtr.i;
  126.     syncPtr.p->ptrI = syncPtr.i;
  127.     g_subPtrI = subPtr.i;
  128.     //    sendSTTORRY(signal);
  129. #endif    
  130.     return;
  131.   }
  132.   if(startphase == 5) {
  133.     getNodeGroupMembers(signal);
  134.     if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
  135.       jam();
  136.       for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  137. Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
  138. if (ref != reference())
  139.   sendSignal(ref, GSN_SUMA_START_ME, signal,
  140.      1 /*SumaStartMe::SignalLength*/, JBB);
  141.       }
  142.     }
  143.   }
  144.   
  145.   if(startphase == 7) {
  146.     c_restartLock = false; // may be set false earlier with HANDOVER_REQ
  147.     
  148.     if (g_TypeOfStart != NodeState::ST_NODE_RESTART) {
  149.       for( int i = 0; i < NO_OF_BUCKETS; i++) {
  150. if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
  151.   // I'm running this bucket
  152. #ifdef EVENT_DEBUG
  153.   ndbout_c("bucket %u set to true", i);
  154. #endif
  155.   c_buckets[i].active = true;
  156. }
  157.       }
  158.     }
  159.     if(g_TypeOfStart == NodeState::ST_INITIAL_START &&
  160.        c_masterNodeId == getOwnNodeId()) {
  161.       jam();
  162.       createSequence(signal);
  163.       return;
  164.     }//if
  165.   }//if
  166.   
  167.   sendSTTORRY(signal);
  168.   
  169.   return;
  170. }
  171. void
  172. Suma::createSequence(Signal* signal)
  173. {
  174.   jam();
  175.   UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
  176.   
  177.   req->senderData  = RNIL;
  178.   req->sequenceId  = SUMA_SEQUENCE;
  179.   req->requestType = UtilSequenceReq::Create;
  180. #ifdef DEBUG_SUMA_SEQUENCE
  181.   ndbout_c("SUMA: Create sequence");
  182. #endif
  183.   sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
  184.      signal, UtilSequenceReq::SignalLength, JBB);
  185.   // execUTIL_SEQUENCE_CONF will call createSequenceReply()
  186. }
  187. void
  188. Suma::createSequenceReply(Signal* signal,
  189.   UtilSequenceConf * conf,
  190.   UtilSequenceRef * ref)
  191. {
  192.   jam();
  193.   if (ref != NULL)
  194.     ndbrequire(false);
  195.   sendSTTORRY(signal);
  196. }
  197. void
  198. Suma::execREAD_NODESCONF(Signal* signal){
  199.   jamEntry();
  200.   ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
  201.  
  202.   c_aliveNodes.clear();
  203.   c_preparingNodes.clear();
  204.   Uint32 count = 0;
  205.   for(Uint32 i = 0; i < MAX_NDB_NODES; i++){
  206.     if(NodeBitmask::get(conf->allNodes, i)){
  207.       jam();
  208.       
  209.       count++;
  210.       NodePtr node;
  211.       ndbrequire(c_nodes.seize(node));
  212.       
  213.       node.p->nodeId = i;
  214.       if(NodeBitmask::get(conf->inactiveNodes, i)){
  215. jam();
  216. node.p->alive = 0;
  217.       } else {
  218. jam();
  219. node.p->alive = 1;
  220. c_aliveNodes.set(i);
  221.       }
  222.     } else
  223.       jam();
  224.   }
  225.   c_masterNodeId = conf->masterNodeId;
  226.   ndbrequire(count == conf->noOfNodes);
  227.   sendSTTORRY(signal);
  228. }
  229. #if 0
  230. void
  231. Suma::execREAD_CONFIG_REQ(Signal* signal) 
  232. {
  233.   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
  234.   Uint32 ref = req->senderRef;
  235.   Uint32 senderData = req->senderData;
  236.   ndbrequire(req->noOfParameters == 0);
  237.   jamEntry();
  238.   const ndb_mgm_configuration_iterator * p = 
  239.     theConfiguration.getOwnConfigIterator();
  240.   ndbrequire(p != 0);
  241.   
  242.   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_FILES, 
  243. &cnoLogFiles));
  244.   ndbrequire(cnoLogFiles > 0);
  245.   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &cfragrecFileSize));
  246.   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TABLE, &ctabrecFileSize));
  247.   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TC_CONNECT, 
  248. &ctcConnectrecFileSize));
  249.   clogFileFileSize       = 4 * cnoLogFiles;
  250.   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
  251.   cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG;
  252.   initRecords();
  253.   initialiseRecordsLab(signal, 0, ref, senderData);
  254.   
  255.   return;
  256. }//Dblqh::execSIZEALT_REP()
  257. #endif
  258. void
  259. Suma::sendSTTORRY(Signal* signal){
  260.   signal->theData[0] = 0;
  261.   signal->theData[3] = 1;
  262.   signal->theData[4] = 3;
  263.   signal->theData[5] = 5;
  264.   signal->theData[6] = 7;
  265.   signal->theData[7] = 255; // No more start phases from missra
  266.   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
  267. }
  268. void
  269. Suma::execNDB_STTOR(Signal* signal) 
  270. {
  271.   jamEntry();                            
  272. }
  273. void
  274. Suma::execCONTINUEB(Signal* signal){
  275.   jamEntry();
  276. }
  277. void
  278. SumaParticipant::execCONTINUEB(Signal* signal) 
  279. {
  280.   jamEntry();
  281. }
  282. /*****************************************************************************
  283.  * 
  284.  * Node state handling
  285.  *
  286.  *****************************************************************************/
  287. void Suma::execAPI_FAILREQ(Signal* signal) 
  288. {
  289.   jamEntry();
  290.   Uint32 failedApiNode = signal->theData[0];
  291.   //BlockReference retRef = signal->theData[1];
  292.   c_failedApiNodes.set(failedApiNode);
  293.   bool found = removeSubscribersOnNode(signal, failedApiNode);
  294.   if(!found){
  295.     jam();
  296.     c_failedApiNodes.clear(failedApiNode);
  297.   }
  298. }//execAPI_FAILREQ()
  299. bool
  300. SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
  301. {
  302.   bool found = false;
  303.   SubscriberPtr i_subbPtr;
  304.   c_dataSubscribers.first(i_subbPtr);
  305.   while(!i_subbPtr.isNull()){
  306.     SubscriberPtr subbPtr = i_subbPtr;
  307.     c_dataSubscribers.next(i_subbPtr);
  308.     jam();
  309.     if (refToNode(subbPtr.p->m_subscriberRef) == nodeId) {
  310.       jam();
  311.       c_dataSubscribers.remove(subbPtr);
  312.       c_removeDataSubscribers.add(subbPtr);
  313.       found = true;
  314.     }
  315.   }
  316.   if(found){
  317.     jam();
  318.     sendSubStopReq(signal);
  319.   }
  320.   return found;
  321. }
  322. void
  323. SumaParticipant::sendSubStopReq(Signal *signal){
  324.   static bool remove_lock = false;
  325.   jam();
  326.   if(remove_lock) {
  327.     jam();
  328.     return;
  329.   }
  330.   remove_lock = true;
  331.   SubscriberPtr subbPtr;
  332.   c_removeDataSubscribers.first(subbPtr);
  333.   if (subbPtr.isNull()){
  334.     jam();
  335. #if 0
  336.     signal->theData[0] = failedApiNode;
  337.     signal->theData[1] = reference();
  338.     sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
  339. #endif
  340.     c_failedApiNodes.clear();
  341.     remove_lock = false;
  342.     return;
  343.   }
  344.   SubscriptionPtr subPtr;
  345.   c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  346.   SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
  347.   req->senderRef       = reference();
  348.   req->senderData      = subbPtr.i;
  349.   req->subscriberRef   = subbPtr.p->m_subscriberRef;
  350.   req->subscriberData  = subbPtr.p->m_subscriberData;
  351.   req->subscriptionId  = subPtr.p->m_subscriptionId;
  352.   req->subscriptionKey = subPtr.p->m_subscriptionKey;
  353.   req->part = SubscriptionData::TableData;
  354.   sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
  355. }
  356. void
  357. SumaParticipant::execSUB_STOP_CONF(Signal* signal){
  358.   jamEntry();
  359.   SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();
  360.   //  Uint32 subscriberData = conf->subscriberData;
  361.   //  Uint32 subscriberRef = conf->subscriberRef;
  362.   Subscription key; 
  363.   key.m_subscriptionId = conf->subscriptionId;
  364.   key.m_subscriptionKey = conf->subscriptionKey;
  365.   SubscriptionPtr subPtr;
  366.   if(c_subscriptions.find(subPtr, key)) {
  367.     jam();
  368.     if (subPtr.p->m_markRemove) {
  369.       jam();
  370.       ndbrequire(false);
  371.       ndbrequire(subPtr.p->m_nSubscribers > 0);
  372.       subPtr.p->m_nSubscribers--;
  373.       if (subPtr.p->m_nSubscribers == 0){
  374. jam();
  375. completeSubRemoveReq(signal, subPtr);
  376.       }
  377.     }
  378.   }
  379.   sendSubStopReq(signal);
  380. }
  381. void
  382. SumaParticipant::execSUB_STOP_REF(Signal* signal){
  383.   jamEntry();
  384.   SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
  385.   Uint32 subscriptionId = ref->subscriptionId;
  386.   Uint32 subscriptionKey = ref->subscriptionKey;
  387.   Uint32 part = ref->part;
  388.   Uint32 subscriberData = ref->subscriberData;
  389.   Uint32 subscriberRef = ref->subscriberRef;
  390.   //  Uint32 err = ref->err;
  391.   if(!ref->isTemporary()){
  392.     ndbrequire(false);
  393.   }
  394.   SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
  395.   req->subscriberRef = subscriberRef;
  396.   req->subscriberData = subscriberData;
  397.   req->subscriptionId = subscriptionId;
  398.   req->subscriptionKey = subscriptionKey;
  399.   req->part = part;
  400.   sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
  401. }
  402. void
  403. Suma::execNODE_FAILREP(Signal* signal){
  404.   jamEntry();
  405.   NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr();
  406.   
  407.   bool changed = false;
  408.   NodePtr nodePtr;
  409. #ifdef NODEFAIL_DEBUG
  410.   ndbout_c("Suma: nodefailrep");
  411. #endif
  412.   c_nodeFailGCI = getFirstGCI(signal);
  413.   for(c_nodes.first(nodePtr); nodePtr.i != RNIL; c_nodes.next(nodePtr)){
  414.     if(NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)){
  415.       if(nodePtr.p->alive){
  416. ndbassert(c_aliveNodes.get(nodePtr.p->nodeId));
  417. changed = true;
  418. jam();
  419.       } else {
  420. ndbassert(!c_aliveNodes.get(nodePtr.p->nodeId));
  421. jam();
  422.       }
  423.       
  424.       if (c_preparingNodes.get(nodePtr.p->nodeId)) {
  425. jam();
  426. // we are currently preparing this node that died
  427. // it's ok just to clear and go back to waiting for it to start up
  428. Restart.resetNode(calcSumaBlockRef(nodePtr.p->nodeId));
  429. c_preparingNodes.clear(nodePtr.p->nodeId);
  430.       } else if (c_handoverToDo) {
  431. jam();
  432. // TODO what if I'm a SUMA that is currently restarting and the SUMA
  433. // responsible for restarting me is the one that died?
  434. // a node has failed whilst handover is going on
  435. // let's check if we're in the process of handover with that node
  436. c_handoverToDo = false;
  437. for( int i = 0; i < NO_OF_BUCKETS; i++) {
  438.   if (c_buckets[i].handover) {
  439.     // I'm doing handover, but is it with the dead node?
  440.     if (getResponsibleSumaNodeId(i) == nodePtr.p->nodeId) {
  441.       // so it was the dead node, has handover started?
  442.       if (c_buckets[i].handover_started) {
  443. jam();
  444. // we're not ok and will have lost data!
  445. // set not active to indicate this -
  446. // this will generate takeover behaviour
  447. c_buckets[i].active = false;
  448. c_buckets[i].handover_started = false;
  449.       } // else we're ok to revert back to state before 
  450.       c_buckets[i].handover = false;
  451.     } else {
  452.       jam();
  453.       // ok, we're doing handover with a different node
  454.       c_handoverToDo = true;
  455.     }
  456.   }
  457. }
  458.       }
  459.       c_failoverBuffer.nodeFailRep();
  460.       nodePtr.p->alive = 0;
  461.       c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above
  462.     }
  463.   }
  464. }
  465. void
  466. Suma::execINCL_NODEREQ(Signal* signal){
  467.   jamEntry();
  468.   
  469.   //const Uint32 senderRef = signal->theData[0];
  470.   const Uint32 inclNode  = signal->theData[1];
  471.   NodePtr node;
  472.   for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)){
  473.     jam();
  474.     const Uint32 nodeId = node.p->nodeId;
  475.     if(inclNode == nodeId){
  476.       jam();
  477.       
  478.       ndbrequire(node.p->alive == 0);
  479.       ndbrequire(!c_aliveNodes.get(nodeId));
  480.       
  481.       for (Uint32 j = 0; j < c_noNodesInGroup; j++) {
  482.         jam();
  483. if (c_nodesInGroup[j] == nodeId) {
  484.   // the starting node is part of my node group
  485.           jam();
  486.   c_preparingNodes.set(nodeId); // set as being prepared
  487.   for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  488.             jam();
  489.     if (i == c_idInNodeGroup) {
  490.               jam();
  491.       // I'm responsible for restarting this SUMA
  492.       // ALL dict's should have meta data info so it is ok to start
  493.       Restart.startNode(signal, calcSumaBlockRef(nodeId));
  494.       break;
  495.     }//if
  496.     if (c_aliveNodes.get(c_nodesInGroup[i])) {
  497.               jam();
  498.       break; // another Suma takes care of this
  499.     }//if
  500.   }//for
  501.   break;
  502. }//if
  503.       }//for
  504.       node.p->alive = 1;
  505.       c_aliveNodes.set(nodeId);
  506.       break;
  507.     }//if
  508.   }//for
  509. #if 0 // if we include this DIH's got to be prepared, later if needed...
  510.   signal->theData[0] = reference();
  511.   
  512.   sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
  513. #endif
  514. }
  515. void
  516. Suma::execSIGNAL_DROPPED_REP(Signal* signal){
  517.   jamEntry();
  518.   ndbrequire(false);
  519. }
  520. /********************************************************************
  521.  *
  522.  * Dump state
  523.  *
  524.  */
  525. void
  526. Suma::execDUMP_STATE_ORD(Signal* signal){
  527.   jamEntry();
  528.   Uint32 tCase = signal->theData[0];
  529.   if(tCase >= 8000 && tCase <= 8003){
  530.     SubscriptionPtr subPtr;
  531.     c_subscriptions.getPtr(subPtr, g_subPtrI);
  532.     
  533.     Ptr<SyncRecord> syncPtr;
  534.     c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  535.     
  536.     if(tCase == 8000){
  537.       syncPtr.p->startMeta(signal);
  538.     }
  539.     
  540.     if(tCase == 8001){
  541.       syncPtr.p->startScan(signal);
  542.     }
  543.     if(tCase == 8002){
  544.       syncPtr.p->startTrigger(signal);
  545.     }
  546.     
  547.     if(tCase == 8003){
  548.       subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
  549.       LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
  550.       Uint32 tab = 0;
  551.       Uint32 att[] = { 0, 1, 1 };
  552.       syncPtr.p->m_tableList.append(&tab, 1);
  553.       attrs.append(att, 3);
  554.     }
  555.   }
  556.   if(tCase == 8004){
  557.     infoEvent("Suma: c_subscriberPool  size: %d free: %d",
  558.       c_subscriberPool.getSize(),
  559.       c_subscriberPool.getNoOfFree());
  560.     infoEvent("Suma: c_tablePool  size: %d free: %d",
  561.       c_tablePool_.getSize(),
  562.       c_tablePool_.getNoOfFree());
  563.     infoEvent("Suma: c_subscriptionPool  size: %d free: %d",
  564.       c_subscriptionPool.getSize(),
  565.       c_subscriptionPool.getNoOfFree());
  566.     infoEvent("Suma: c_syncPool  size: %d free: %d",
  567.       c_syncPool.getSize(),
  568.       c_syncPool.getNoOfFree());
  569.     infoEvent("Suma: c_dataBufferPool  size: %d free: %d",
  570.       c_dataBufferPool.getSize(),
  571.       c_dataBufferPool.getNoOfFree());
  572.   }
  573. }
  574. /********************************************************************
  575.  *
  576.  * Convert a table name (db+schema+tablename) to tableId
  577.  *
  578.  */
  579. #if 0
  580. void
  581. SumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal)
  582. {
  583.   jam();
  584.   if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) {
  585.     jam();
  586.     GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend();
  587.     char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable];
  588.     const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated
  589.     req->senderRef  = reference();
  590.     req->senderData = subPtr.i;
  591.     req->len        = strLen;
  592.     LinearSectionPtr ptr[1];
  593.     ptr[0].p  = (Uint32*)tableName;
  594.     ptr[0].sz = strLen;
  595.     sendSignal(DBDICT_REF,
  596.        GSN_GET_TABLEID_REQ, 
  597.        signal, 
  598.        GetTableIdReq::SignalLength,
  599.        JBB,
  600.        ptr,
  601.        1);
  602.   } else {
  603.     jam();
  604.     sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr);
  605.   }
  606. }
  607. #endif
  608. void 
  609. SumaParticipant::addTableId(Uint32 tableId,
  610.     SubscriptionPtr subPtr, SyncRecord *psyncRec)
  611. {
  612. #ifdef NODEFAIL_DEBUG
  613.   ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u",
  614.    tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);
  615. #endif
  616.   subPtr.p->m_tables[tableId] = 1;
  617.   subPtr.p->m_currentTable++;
  618.   if(psyncRec != NULL)
  619.     psyncRec->m_tableList.append(&tableId, 1);  
  620. }
  621. #if 0
  622. void 
  623. SumaParticipant::execGET_TABLEID_CONF(Signal * signal)
  624. {
  625.   jamEntry();
  626.   GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr();
  627.   Uint32 tableId = conf->tableId;
  628.   //Uint32 schemaVersion = conf->schemaVersion;  
  629.   Uint32 senderData = conf->senderData;
  630.   SubscriptionPtr subPtr;
  631.   Ptr<SyncRecord> syncPtr;
  632.   c_subscriptions.getPtr(subPtr, senderData);
  633.   c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  
  634.   /*
  635.    * add to m_tableList
  636.    */
  637.   addTableId(tableId, subPtr, syncPtr.p);
  638.   convertNameToId(subPtr, signal);
  639. }
  640. void 
  641. SumaParticipant::execGET_TABLEID_REF(Signal * signal)
  642. {
  643.   jamEntry();
  644.   GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr();
  645.   Uint32 senderData         = ref->senderData;
  646.   //  Uint32 err                = ref->err;
  647.   
  648.   SubscriptionPtr subPtr;
  649.   c_subscriptions.getPtr(subPtr, senderData);
  650.   Uint32 subData = subPtr.p->m_subscriberData;
  651.   SubCreateRef * reff = (SubCreateRef*)ref;
  652.   /**
  653.    * @todo: map ref->err to GrepError.
  654.    */
  655.   reff->err = GrepError::SELECTED_TABLE_NOT_FOUND;
  656.   reff->subscriberData = subData;
  657.   sendSignal(subPtr.p->m_subscriberRef,
  658.      GSN_SUB_CREATE_REF, 
  659.      signal, 
  660.      SubCreateRef::SignalLength,
  661.      JBB);
  662. }
  663. #endif
  664. /*************************************************************
  665.  *
  666.  * Creation of subscription id's
  667.  *
  668.  ************************************************************/
  669. void 
  670. Suma::execCREATE_SUBID_REQ(Signal* signal) 
  671. {
  672.   jamEntry();
  673.   CRASH_INSERTION(13001);
  674.   CreateSubscriptionIdReq const * req =
  675.     (CreateSubscriptionIdReq*)signal->getDataPtr();
  676.   SubscriberPtr subbPtr;
  677.   if(!c_subscriberPool.seize(subbPtr)){
  678.     jam();
  679.     sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM);
  680.     return;
  681.   }
  682.   subbPtr.p->m_subscriberRef  = signal->getSendersBlockRef(); 
  683.   subbPtr.p->m_senderData     = req->senderData;
  684.   subbPtr.p->m_subscriberData = subbPtr.i;
  685.   UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
  686.    
  687.   utilReq->senderData  = subbPtr.p->m_subscriberData;
  688.   utilReq->sequenceId  = SUMA_SEQUENCE;
  689.   utilReq->requestType = UtilSequenceReq::NextVal;
  690.   sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
  691.      signal, UtilSequenceReq::SignalLength, JBB);
  692. }
  693. void
  694. Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
  695. {
  696.   jamEntry();
  697.   CRASH_INSERTION(13002);
  698.   UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
  699. #ifdef DEBUG_SUMA_SEQUENCE
  700.   ndbout_c("SUMA: Create sequence conf");
  701. #endif
  702.   if(conf->requestType == UtilSequenceReq::Create) {
  703.     jam();
  704.     createSequenceReply(signal, conf, NULL);
  705.     return;
  706.   }
  707.   Uint64 subId;
  708.   memcpy(&subId,conf->sequenceValue,8);
  709.   Uint32 subData = conf->senderData;
  710.   SubscriberPtr subbPtr;
  711.   c_subscriberPool.getPtr(subbPtr,subData);
  712.   
  713.   CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
  714.   subconf->subscriptionId = (Uint32)subId;
  715.   subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
  716.   subconf->subscriberData = subbPtr.p->m_senderData;
  717.   
  718.   sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal,
  719.      CreateSubscriptionIdConf::SignalLength, JBB);
  720.   c_subscriberPool.release(subbPtr);
  721. }
  722. void
  723. Suma::execUTIL_SEQUENCE_REF(Signal* signal)
  724. {
  725.   jamEntry();
  726.   UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
  727.   if(ref->requestType == UtilSequenceReq::Create) {
  728.     jam();
  729.     createSequenceReply(signal, NULL, ref);
  730.     return;
  731.   }
  732.   Uint32 subData = ref->senderData;
  733.   SubscriberPtr subbPtr;
  734.   c_subscriberPool.getPtr(subbPtr,subData);
  735.   sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);
  736.   c_subscriberPool.release(subbPtr);
  737.   return;
  738. }//execUTIL_SEQUENCE_REF()
  739. void
  740. SumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){
  741.   jam();
  742.   CreateSubscriptionIdRef  * ref = 
  743.     (CreateSubscriptionIdRef *)signal->getDataPtrSend();
  744.   ref->err = errCode;
  745.   sendSignal(signal->getSendersBlockRef(), 
  746.      GSN_CREATE_SUBID_REF,
  747.      signal, 
  748.      CreateSubscriptionIdRef::SignalLength,
  749.      JBB);
  750.   
  751.   releaseSections(signal);  
  752.   return;
  753. }
  754. /**********************************************************
  755.  * Suma participant interface
  756.  *
  757.  * Creation of subscriptions
  758.  */
  759. void
  760. SumaParticipant::execSUB_CREATE_REQ(Signal* signal) {
  761. #ifdef NODEFAIL_DEBUG
  762.   ndbout_c("SumaParticipant::execSUB_CREATE_REQ");
  763. #endif
  764.   jamEntry();                            
  765.   CRASH_INSERTION(13003);
  766.   const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();    
  767.   
  768.   const Uint32 subId   = req.subscriptionId;
  769.   const Uint32 subKey  = req.subscriptionKey;
  770.   const Uint32 subRef  = req.subscriberRef;
  771.   const Uint32 subData = req.subscriberData;
  772.   const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;
  773.   const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;
  774.   const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
  775.   const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;
  776.   const Uint32 sender = signal->getSendersBlockRef();
  777.   Subscription key;
  778.   key.m_subscriptionId  = subId;
  779.   key.m_subscriptionKey = subKey;
  780.   SubscriptionPtr subPtr;
  781.   Ptr<SyncRecord> syncPtr;
  782.   
  783.   if (addTableFlag) {
  784.     ndbrequire(restartFlag);  //TODO remove this
  785.     if(!c_subscriptions.find(subPtr, key)) {
  786.       jam();
  787.       sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND);
  788.       return;
  789.     }
  790.     jam();
  791.     c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  792.   } else {
  793.     // Check that id/key is unique
  794.     if(c_subscriptions.find(subPtr, key)) {
  795.       jam();
  796.       sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE);
  797.       return;
  798.     }
  799.     if(!c_subscriptions.seize(subPtr)) {
  800.       jam();
  801.       sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);
  802.       return;
  803.     }
  804.     if(!c_syncPool.seize(syncPtr)) {
  805.       jam();
  806.       sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);
  807.       return;
  808.     }
  809.     jam();
  810.     subPtr.p->m_subscriberRef    = subRef;
  811.     subPtr.p->m_subscriberData   = subData;
  812.     subPtr.p->m_subscriptionId   = subId;
  813.     subPtr.p->m_subscriptionKey  = subKey;
  814.     subPtr.p->m_subscriptionType = type;
  815.   
  816.     /**
  817.      * ok to memset? Support on all compilers
  818.      * @todo find out if memset is supported by all compilers
  819.      */
  820.     memset(subPtr.p->m_tables,0,MAX_TABLES);
  821.     subPtr.p->m_maxTables    = 0;
  822.     subPtr.p->m_currentTable = 0;
  823.     subPtr.p->m_syncPtrI   = syncPtr.i;
  824.     subPtr.p->m_markRemove = false;
  825.     subPtr.p->m_nSubscribers = 0;
  826.     c_subscriptions.add(subPtr);
  827.     syncPtr.p->m_subscriptionPtrI = subPtr.i;
  828.     syncPtr.p->m_doSendSyncData   = true;
  829.     syncPtr.p->ptrI               = syncPtr.i;
  830.     syncPtr.p->m_locked           = false;
  831.     syncPtr.p->m_error            = false;
  832.   }
  833.   if (restartFlag || 
  834.       type == SubCreateReq::TableEvent) {
  835.     syncPtr.p->m_doSendSyncData = false;
  836.     ndbrequire(type != SubCreateReq::SingleTableScan);
  837.     jam();
  838.     if (subPtr.p->m_tables[req.tableId] != 0) {
  839.       ndbrequire(false); //TODO remove
  840.       jam();
  841.       sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED);
  842.       return;
  843.     }
  844.     if (addTableFlag) {
  845.       ndbrequire(type != SubCreateReq::TableEvent);
  846.       jam();
  847.     }
  848.     subPtr.p->m_maxTables++;
  849.     addTableId(req.tableId, subPtr, syncPtr.p);
  850.   } else {
  851.     switch(type){
  852.     case SubCreateReq::SingleTableScan:
  853.       {
  854. jam();
  855. syncPtr.p->m_tableList.append(&req.tableId, 1);
  856. if(signal->getNoOfSections() > 0){
  857.   SegmentedSectionPtr ptr;
  858.   signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST);
  859.   LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
  860.   append(attrBuf, ptr, getSectionSegmentPool());
  861. }
  862.       }
  863.     break;
  864. #if 0
  865.     case SubCreateReq::SelectiveTableSnapshot:
  866.       /**
  867.        * Tables specified by the user that does not exist
  868.        * in the database are just ignored. No error message
  869.        * is given, nor does the db nodes crash
  870.        * @todo: Memory is not release here (used tableBuf)
  871.        */
  872.       {
  873. if(signal->getNoOfSections() == 0 ){
  874.   jam();
  875.   sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS);
  876.   return;
  877. }
  878. jam();      
  879. SegmentedSectionPtr ptr;
  880. signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST);
  881. SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool());
  882. Uint32 i=0;
  883. char table[MAX_TAB_NAME_SIZE];
  884. r0.reset();
  885. r0.first();
  886. while(true){
  887.   if ((r0.getValueType() != SimpleProperties::StringValue) ||
  888.       (r0.getValueLen() <= 0)) {
  889.     releaseSections(signal);
  890.     ndbrequire(false);
  891.   }
  892.   r0.getString(table);
  893.   strcpy(subPtr.p->m_tableNames[i],table);
  894.   i++;
  895.   if(!r0.next())
  896.     break;
  897. }
  898. releaseSections(signal);
  899. subPtr.p->m_maxTables    = i;
  900. subPtr.p->m_currentTable = 0;
  901. releaseSections(signal);
  902. convertNameToId(subPtr, signal);
  903. return;
  904.       }
  905.     break;
  906. #endif
  907.     case SubCreateReq::DatabaseSnapshot:
  908.       {
  909. jam();
  910.       }
  911.     break;
  912.     default:
  913.       ndbrequire(false);
  914.     }
  915.   }
  916.   sendSubCreateConf(signal, sender, subPtr);
  917.   return;
  918. }
  919. void
  920. SumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender,
  921.    SubscriptionPtr subPtr)
  922. {
  923.   SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();      
  924.   conf->subscriptionId       = subPtr.p->m_subscriptionId;
  925.   conf->subscriptionKey      = subPtr.p->m_subscriptionKey;
  926.   conf->subscriberData       = subPtr.p->m_subscriberData;
  927.   sendSignal(sender, GSN_SUB_CREATE_CONF, signal,
  928.      SubCreateConf::SignalLength, JBB);
  929. }
  930. void
  931. SumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){
  932.   jam();
  933.   SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
  934.   ref->subscriberRef  = reference();
  935.   ref->subscriberData = req.subscriberData;
  936.   ref->err = errCode;
  937.   releaseSections(signal);
  938.   sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 
  939.      SubCreateRef::SignalLength, JBB);
  940.   return;
  941. }
  942. Uint32
  943. SumaParticipant::getFirstGCI(Signal* signal) {
  944.   if (c_lastCompleteGCI == RNIL) {
  945.     ndbout_c("WARNING: c_lastCompleteGCI == RNIL");
  946.     return 0;
  947.   }
  948.   return c_lastCompleteGCI+3;
  949. }
  950. /**********************************************************
  951.  *
  952.  * Setting upp trigger for subscription
  953.  *
  954.  */
  955. void 
  956. SumaParticipant::execSUB_SYNC_REQ(Signal* signal) {
  957.   jamEntry();
  958.   CRASH_INSERTION(13004);
  959. #ifdef EVENT_PH3_DEBUG
  960.   ndbout_c("SumaParticipant::execSUB_SYNC_REQ");
  961. #endif
  962.   SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
  963.   SubscriptionPtr subPtr;
  964.   Subscription key; 
  965.   key.m_subscriptionId = req->subscriptionId;
  966.   key.m_subscriptionKey = req->subscriptionKey;
  967.   
  968.   if(!c_subscriptions.find(subPtr, key)){
  969.     jam();
  970.     sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);
  971.     return;
  972.   }
  973.   /**
  974.    * @todo Tomas, do you really need to do this?
  975.    */
  976.   if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
  977.     jam();
  978.     subPtr.p->m_subscriberData = req->subscriberData;
  979.   }
  980.   bool ok = false;
  981.   SubscriptionData::Part part = (SubscriptionData::Part)req->part;
  982.   
  983.   Ptr<SyncRecord> syncPtr;
  984.   c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  985.   switch(part){
  986.   case SubscriptionData::MetaData:
  987.     ok = true;
  988.     jam();
  989.     if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {
  990.       TableList::DataBufferIterator it;
  991.       syncPtr.p->m_tableList.first(it);
  992.       if(it.isNull()) {
  993. /**
  994.  * Get all tables from dict
  995.  */
  996. ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();
  997. req->senderRef   = reference();
  998. req->senderData  = syncPtr.i;
  999. req->requestData = 0;
  1000. /**
  1001.  * @todo: accomodate scan of index tables?
  1002.  */
  1003. req->setTableType(DictTabInfo::UserTable);
  1004. sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, 
  1005.    ListTablesReq::SignalLength, JBB);
  1006. break;
  1007.       }
  1008.     }
  1009.     syncPtr.p->startMeta(signal);
  1010.     break;
  1011.   case SubscriptionData::TableData: {
  1012.     ok = true;
  1013.     jam();
  1014.     syncPtr.p->startScan(signal);
  1015.     break;
  1016.   }
  1017.   }
  1018.   ndbrequire(ok);
  1019. }
  1020. void
  1021. SumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){
  1022.   jam();
  1023.   SubSyncRef  * ref = 
  1024.     (SubSyncRef *)signal->getDataPtrSend();
  1025.   ref->err = errCode;
  1026.   sendSignal(signal->getSendersBlockRef(), 
  1027.      GSN_SUB_SYNC_REF, 
  1028.      signal, 
  1029.      SubSyncRef::SignalLength,
  1030.      JBB);
  1031.      
  1032.   releaseSections(signal);  
  1033.   return;
  1034. }
  1035. /**********************************************************
  1036.  * Dict interface
  1037.  */
  1038. void
  1039. SumaParticipant::execLIST_TABLES_CONF(Signal* signal){
  1040.   jamEntry();
  1041.   CRASH_INSERTION(13005);
  1042.   ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();
  1043.   SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
  1044.   tmp->runLIST_TABLES_CONF(signal);
  1045. }
  1046. void
  1047. SumaParticipant::execGET_TABINFOREF(Signal* signal){
  1048.   jamEntry();
  1049.   GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr();
  1050.   SyncRecord* tmp = c_syncPool.getPtr(ref->senderData);
  1051.   tmp->runGET_TABINFOREF(signal);
  1052. }
  1053. void
  1054. SumaParticipant::execGET_TABINFO_CONF(Signal* signal){
  1055.   jamEntry();
  1056.   CRASH_INSERTION(13006);
  1057.   if(!assembleFragments(signal)){
  1058.     return;
  1059.   }
  1060.   
  1061.   GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
  1062.   
  1063.   Uint32 tableId = conf->tableId;
  1064.   Uint32 senderData = conf->senderData;
  1065.   SyncRecord* tmp = c_syncPool.getPtr(senderData);
  1066.   ndbrequire(parseTable(signal, conf, tableId, tmp));
  1067.   tmp->runGET_TABINFO_CONF(signal);
  1068. }
  1069. bool
  1070. SumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId,
  1071.     SyncRecord* syncPtr_p){
  1072.   SegmentedSectionPtr ptr;
  1073.   signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
  1074.   
  1075.   SimplePropertiesSectionReader it(ptr, getSectionSegmentPool());
  1076.   
  1077.   SimpleProperties::UnpackStatus s;
  1078.   DictTabInfo::Table tableDesc; tableDesc.init();
  1079.   s = SimpleProperties::unpack(it, &tableDesc, 
  1080.        DictTabInfo::TableMapping, 
  1081.        DictTabInfo::TableMappingSize, 
  1082.        true, true);
  1083.   
  1084.   ndbrequire(s == SimpleProperties::Break);
  1085.   TablePtr tabPtr;
  1086.   c_tables.find(tabPtr, tableId);
  1087.   
  1088.   if(!tabPtr.isNull() &&
  1089.      tabPtr.p->m_schemaVersion != tableDesc.TableVersion){
  1090.     jam();
  1091.     tabPtr.p->release(* this);
  1092.     // oops wrong schema version in stored tabledesc
  1093.     // we need to find all subscriptions with old table desc
  1094.     // and all subscribers to this
  1095.     // hopefully none
  1096.     c_tables.release(tabPtr);
  1097.     tabPtr.setNull();
  1098.     DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr;
  1099.     c_subscriptions.first(i_subPtr);
  1100.     SubscriptionPtr subPtr;
  1101.     for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
  1102.       jam();
  1103.       c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
  1104.       SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
  1105.       if (tmp == syncPtr_p) {
  1106. jam();
  1107. continue;
  1108.       }
  1109.       if (subPtr.p->m_tables[tableId]) {
  1110. jam();
  1111. subPtr.p->m_tables[tableId] = 0; // remove this old table reference
  1112. TableList::DataBufferIterator it;
  1113. for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
  1114.   jam();
  1115.   if (*it.data == tableId){
  1116.     jam();
  1117.     Uint32 *pdata = it.data;
  1118.     tmp->m_tableList.next(it);
  1119.     for(;!it.isNull();tmp->m_tableList.next(it)) {
  1120.       jam();
  1121.       *pdata = *it.data;
  1122.       pdata = it.data;
  1123.     }
  1124.     *pdata = RNIL; // todo remove this last item...
  1125.     break;
  1126.   }
  1127. }
  1128.       }
  1129.     }
  1130.   }
  1131.   if (tabPtr.isNull()) {
  1132.     jam();
  1133.     /**
  1134.      * Uninitialized table record
  1135.      */
  1136.     ndbrequire(c_tables.seize(tabPtr));
  1137.     new (tabPtr.p) Table;
  1138.     tabPtr.p->m_schemaVersion = RNIL;
  1139.     tabPtr.p->m_tableId = tableId;
  1140.     tabPtr.p->m_hasTriggerDefined[0] = 0;
  1141.     tabPtr.p->m_hasTriggerDefined[1] = 0;
  1142.     tabPtr.p->m_hasTriggerDefined[2] = 0;
  1143.     tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
  1144.     tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
  1145.     tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
  1146. #if 0
  1147.     ndbout_c("Get tab info conf %d", tableId);
  1148. #endif
  1149.     c_tables.add(tabPtr);
  1150.   }
  1151.   if(tabPtr.p->m_attributes.getSize() != 0){
  1152.     jam();
  1153.     return true;
  1154.   }
  1155.   /**
  1156.    * Initialize table object
  1157.    */
  1158.   Uint32 noAttribs = tableDesc.NoOfAttributes;
  1159.   Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);
  1160.   tabPtr.p->m_schemaVersion = tableDesc.TableVersion;
  1161.   
  1162.   // The attribute buffer
  1163.   LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes);
  1164.   
  1165.   // Temporary buffer
  1166.   DataBuffer<15> theRest(c_dataBufferPool);
  1167.   if(!attrBuf.seize(noAttribs)){
  1168.     ndbrequire(false);
  1169.     return false;
  1170.   }
  1171.   
  1172.   if(!theRest.seize(notFixed)){
  1173.     ndbrequire(false);
  1174.     return false;
  1175.   }
  1176.   
  1177.   DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable
  1178.   DataBuffer<15>::DataBufferIterator restIt; // variable + nullable
  1179.   attrBuf.first(attrIt);
  1180.   theRest.first(restIt);
  1181.   
  1182.   for(Uint32 i = 0; i < noAttribs; i++) {
  1183.     DictTabInfo::Attribute attrDesc; attrDesc.init();
  1184.     s = SimpleProperties::unpack(it, &attrDesc, 
  1185.  DictTabInfo::AttributeMapping, 
  1186.  DictTabInfo::AttributeMappingSize, 
  1187.  true, true);
  1188.     ndbrequire(s == SimpleProperties::Break);
  1189.     if (!attrDesc.AttributeNullableFlag 
  1190. /* && !attrDesc.AttributeVariableFlag */) {
  1191.       jam();
  1192.       * attrIt.data = attrDesc.AttributeId;
  1193.       attrBuf.next(attrIt);
  1194.     } else {
  1195.       jam();
  1196.       * restIt.data = attrDesc.AttributeId;
  1197.       theRest.next(restIt);
  1198.     }
  1199.     
  1200.     // Move to next attribute
  1201.     it.next();
  1202.   }
  1203.   /**
  1204.    * Put the rest in end of attrBuf
  1205.    */
  1206.   theRest.first(restIt);
  1207.   for(; !restIt.isNull(); theRest.next(restIt)){
  1208.     * attrIt.data = * restIt.data;
  1209.     attrBuf.next(attrIt);
  1210.   }
  1211.   theRest.release();
  1212.   
  1213.   return true;
  1214. }
  1215. void
  1216. SumaParticipant::execDI_FCOUNTCONF(Signal* signal){
  1217.   jamEntry();
  1218.   
  1219.   CRASH_INSERTION(13007);
  1220.   const Uint32 senderData = signal->theData[3];
  1221.   SyncRecord* tmp = c_syncPool.getPtr(senderData);
  1222.   tmp->runDI_FCOUNTCONF(signal);
  1223. }
  1224. void 
  1225. SumaParticipant::execDIGETPRIMCONF(Signal* signal){
  1226.   jamEntry();
  1227.   
  1228.   CRASH_INSERTION(13008);
  1229.   const Uint32 senderData = signal->theData[1];
  1230.   SyncRecord* tmp = c_syncPool.getPtr(senderData);
  1231.   tmp->runDIGETPRIMCONF(signal);
  1232. }
  1233. void
  1234. SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
  1235.   jamEntry();
  1236.   CRASH_INSERTION(13009);
  1237.   CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
  1238.   const Uint32 senderData = conf->getConnectionPtr();
  1239.   SyncRecord* tmp = c_syncPool.getPtr(senderData);
  1240.   tmp->runCREATE_TRIG_CONF(signal);
  1241.   
  1242.   /**
  1243.    * dodido
  1244.    * @todo: I (Johan) dont know what to do here. Jonas, what do you mean?
  1245.    */
  1246. }
  1247. void
  1248. SumaParticipant::execCREATE_TRIG_REF(Signal* signal){
  1249.   jamEntry();
  1250.   ndbrequire(false);
  1251. }
  1252. void
  1253. SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
  1254.   jamEntry();
  1255.   CRASH_INSERTION(13010);
  1256.   DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
  1257.   const Uint32 senderData = conf->getConnectionPtr();
  1258.   SyncRecord* tmp = c_syncPool.getPtr(senderData);
  1259.   tmp->runDROP_TRIG_CONF(signal);
  1260. }
  1261. void
  1262. SumaParticipant::execDROP_TRIG_REF(Signal* signal){
  1263.   jamEntry();
  1264.   DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
  1265.   const Uint32 senderData = ref->getConnectionPtr();
  1266.   SyncRecord* tmp = c_syncPool.getPtr(senderData);
  1267.   tmp->runDROP_TRIG_CONF(signal);
  1268. }
  1269. /*************************************************************************
  1270.  *
  1271.  *
  1272.  */
  1273. void
  1274. SumaParticipant::SyncRecord::runLIST_TABLES_CONF(Signal* signal){
  1275.   jam();
  1276.   ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
  1277.   const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
  1278.   SubscriptionPtr subPtr;
  1279.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1280.   for (unsigned i = 0; i < len; i++) {
  1281.     subPtr.p->m_maxTables++;
  1282.     suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
  1283.   }
  1284.   //  for (unsigned i = 0; i < len; i++)
  1285.   //    conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
  1286.   //  m_tableList.append(&conf->tableData[0], len);
  1287. #if 0 
  1288.   TableList::DataBufferIterator it;
  1289.   int i = 0;
  1290.   for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {
  1291.     ndbout_c("%u listtableconf tableid %d", i++, *it.data);
  1292.   }
  1293. #endif
  1294.   if(len == ListTablesConf::DataLength){
  1295.     jam();
  1296.     // we expect more LIST_TABLE_CONF
  1297.     return;
  1298.   }
  1299. #if 0
  1300.   subPtr.p->m_currentTable = 0;
  1301.   subPtr.p->m_maxTables    = 0;
  1302.   TableList::DataBufferIterator it;
  1303.   for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
  1304.     subPtr.p->m_maxTables++;
  1305.     suma.addTableId(*it.data, subPtr, NULL);
  1306. #ifdef NODEFAIL_DEBUG
  1307.     ndbout_c(" listtableconf tableid %d",*it.data);
  1308. #endif
  1309.   }
  1310. #endif
  1311.   
  1312.   startMeta(signal);
  1313. }
  1314. void
  1315. SumaParticipant::SyncRecord::startMeta(Signal* signal){
  1316.   jam();
  1317.   m_currentTable = 0;
  1318.   nextMeta(signal);
  1319. }
  1320. /**
  1321.  * m_tableList only contains UserTables
  1322.  */
  1323. void
  1324. SumaParticipant::SyncRecord::nextMeta(Signal* signal){
  1325.   jam();
  1326.   
  1327.   TableList::DataBufferIterator it;
  1328.   if(!m_tableList.position(it, m_currentTable)){
  1329.     completeMeta(signal);
  1330.     return;
  1331.   }
  1332.   GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
  1333.   req->senderRef = suma.reference();
  1334.   req->senderData = ptrI;
  1335.   req->requestType = 
  1336.     GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
  1337.   req->tableId = * it.data;
  1338. #if 0
  1339.   ndbout_c("GET_TABINFOREQ id %d", req->tableId);
  1340. #endif
  1341.   suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 
  1342.   GetTabInfoReq::SignalLength, JBB);
  1343. }
  1344. void
  1345. SumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal)
  1346. {
  1347.   jam();
  1348.   SubscriptionPtr subPtr;
  1349.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1350.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  1351.   Uint32 type = subPtr.p->m_subscriptionType;
  1352.   bool do_continue = false;
  1353.   switch (type) {
  1354.   case SubCreateReq::TableEvent:
  1355.     jam();
  1356.     break;
  1357.   case SubCreateReq::DatabaseSnapshot:
  1358.     jam();
  1359.     do_continue = true;
  1360.     break;
  1361.   case SubCreateReq::SelectiveTableSnapshot:
  1362.     jam();
  1363.     do_continue = true;
  1364.     break;
  1365.   case SubCreateReq::SingleTableScan:
  1366.     jam();
  1367.     break;
  1368.   default:
  1369.     ndbrequire(false);
  1370.     break;
  1371.   }
  1372.   if (! do_continue) {
  1373.     m_error = true;
  1374.     completeMeta(signal);
  1375.     return;
  1376.   }
  1377.   m_currentTable++;
  1378.   nextMeta(signal);
  1379.   return;
  1380.   // now we need to clean-up
  1381. }
  1382. void
  1383. SumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){
  1384.   jam();
  1385.   
  1386.   GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr();
  1387.   //  const Uint32 gci = conf->gci;
  1388.   const Uint32 tableId = conf->tableId;
  1389.   TableList::DataBufferIterator it;
  1390.   
  1391.   ndbrequire(m_tableList.position(it, m_currentTable));
  1392.   ndbrequire(* it.data == tableId);
  1393.   
  1394.   SubscriptionPtr subPtr;
  1395.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1396.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  1397.   
  1398.   SegmentedSectionPtr ptr;
  1399.   signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
  1400.   SubMetaData * data = (SubMetaData*)signal->getDataPtrSend();
  1401.   /** 
  1402.    * sending lastCompleteGCI. Used by Lars in interval calculations
  1403.    * incremenet by one, since last_CompleteGCI is the not the current gci.
  1404.    */
  1405.   data->gci = suma.c_lastCompleteGCI + 1;
  1406.   data->tableId = tableId;
  1407.   data->senderData = subPtr.p->m_subscriberData;
  1408. #if PRINT_ONLY
  1409.   ndbout_c("GSN_SUB_META_DATA Table %d", tableId);
  1410. #else
  1411.   bool okToSend = m_doSendSyncData;
  1412.   /*
  1413.    * If it is a selectivetablesnapshot and the table is not part of the 
  1414.    * subscription, then do not send anything, just continue.
  1415.    * If it is a tablevent, don't send regardless since the APIs are not
  1416.    * interested in meta data.
  1417.    */
  1418.   if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)
  1419.     if(!subPtr.p->m_tables[tableId])
  1420.       okToSend = false;
  1421.   if(okToSend) {
  1422.     if(refToNode(subPtr.p->m_subscriberRef) == 0){
  1423.       jam();
  1424.       suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef),
  1425.   GSN_SUB_META_DATA,
  1426.   signal, 
  1427.   SubMetaData::SignalLength); 
  1428.       jamEntry();
  1429.       suma.releaseSections(signal);
  1430.     } else {
  1431.       jam();
  1432.       suma.sendSignal(subPtr.p->m_subscriberRef, 
  1433.       GSN_SUB_META_DATA,
  1434.       signal, 
  1435.       SubMetaData::SignalLength, JBB);
  1436.     }
  1437.   }
  1438. #endif
  1439.   
  1440.   TablePtr tabPtr;
  1441.   ndbrequire(suma.c_tables.find(tabPtr, tableId));
  1442.   
  1443.   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
  1444.   if(fragBuf.getSize() == 0){
  1445.     /**
  1446.      * We need to gather fragment info
  1447.      */
  1448.     jam();
  1449.     signal->theData[0] = RNIL;
  1450.     signal->theData[1] = tableId;
  1451.     signal->theData[2] = ptrI;
  1452.     suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);    
  1453.     return;
  1454.   }
  1455.   
  1456.   m_currentTable++;
  1457.   nextMeta(signal);
  1458. }
  1459. void 
  1460. SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){
  1461.   jam();
  1462.   const Uint32 userPtr = signal->theData[0];
  1463.   const Uint32 fragCount = signal->theData[1];
  1464.   const Uint32 tableId = signal->theData[2];
  1465.   ndbrequire(userPtr == RNIL && signal->length() == 5);
  1466.   TablePtr tabPtr;
  1467.   ndbrequire(suma.c_tables.find(tabPtr, tableId));
  1468.   
  1469.   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);  
  1470.   ndbrequire(fragBuf.getSize() == 0);
  1471.   
  1472.   m_currentFragment = fragCount;
  1473.   signal->theData[0] = RNIL;
  1474.   signal->theData[1] = ptrI;
  1475.   signal->theData[2] = tableId;
  1476.   signal->theData[3] = 0; // Frag no
  1477.   suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
  1478. }
  1479. void
  1480. SumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){
  1481.   jam();
  1482.   const Uint32 userPtr = signal->theData[0];
  1483.   //const Uint32 senderData = signal->theData[1];
  1484.   const Uint32 nodeCount = signal->theData[6];
  1485.   const Uint32 tableId = signal->theData[7];
  1486.   const Uint32 fragNo = signal->theData[8];
  1487.   
  1488.   ndbrequire(userPtr == RNIL && signal->length() == 9);
  1489.   ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
  1490.   
  1491.   TablePtr tabPtr;
  1492.   ndbrequire(suma.c_tables.find(tabPtr, tableId));
  1493.   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);  
  1494.   /**
  1495.    * Add primary node for fragment to list
  1496.    */
  1497.   FragmentDescriptor fd;
  1498.   fd.m_fragDesc.m_nodeId = signal->theData[2];
  1499.   fd.m_fragDesc.m_fragmentNo = fragNo;
  1500.   signal->theData[2] = fd.m_dummy;
  1501.   fragBuf.append(&signal->theData[2], 1);
  1502.   
  1503.   const Uint32 nextFrag = fragNo + 1;
  1504.   if(nextFrag == m_currentFragment){
  1505.     /**
  1506.      * Complete frag info for table
  1507.      */
  1508.     m_currentTable++;
  1509.     nextMeta(signal);
  1510.     return;
  1511.   }
  1512.   signal->theData[0] = RNIL;
  1513.   signal->theData[1] = ptrI;
  1514.   signal->theData[2] = tableId;
  1515.   signal->theData[3] = nextFrag; // Frag no
  1516.   suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
  1517. }
  1518. void
  1519. SumaParticipant::SyncRecord::completeMeta(Signal* signal){
  1520.   jam();
  1521.   SubscriptionPtr subPtr;
  1522.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1523.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  1524.   
  1525. #if PRINT_ONLY
  1526.   ndbout_c("GSN_SUB_SYNC_CONF (meta)");
  1527. #else
  1528.  
  1529.   suma.releaseSections(signal);
  1530.   if (m_error) {
  1531.     SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
  1532.     ref->subscriptionId = subPtr.p->m_subscriptionId;
  1533.     ref->subscriptionKey = subPtr.p->m_subscriptionKey;
  1534.     ref->part = SubscriptionData::MetaData;
  1535.     ref->subscriberData = subPtr.p->m_subscriberData;
  1536.     ref->errorCode = SubSyncRef::Undefined;
  1537.     suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal,
  1538.     SubSyncRef::SignalLength, JBB);
  1539.   } else {
  1540.     SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
  1541.     conf->subscriptionId = subPtr.p->m_subscriptionId;
  1542.     conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  1543.     conf->part = SubscriptionData::MetaData;
  1544.     conf->subscriberData = subPtr.p->m_subscriberData;
  1545.     suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
  1546.     SubSyncConf::SignalLength, JBB);
  1547.   }
  1548. #endif
  1549. }
  1550. /**********************************************************
  1551.  *
  1552.  * Scan interface
  1553.  *
  1554.  */
  1555. void
  1556. SumaParticipant::SyncRecord::startScan(Signal* signal){
  1557.   jam();
  1558.   
  1559.   /**
  1560.    * Get fraginfo
  1561.    */
  1562.   m_currentTable = 0;
  1563.   m_currentFragment = 0;
  1564.   
  1565.   nextScan(signal);
  1566. }
  1567. bool
  1568. SumaParticipant::SyncRecord::getNextFragment(TablePtr * tab, 
  1569.      FragmentDescriptor * fd){
  1570.   jam();
  1571.   SubscriptionPtr subPtr;
  1572.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1573.   TableList::DataBufferIterator tabIt;
  1574.   DataBuffer<15>::DataBufferIterator fragIt;
  1575.   
  1576.   m_tableList.position(tabIt, m_currentTable);
  1577.   for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){
  1578.     TablePtr tabPtr;
  1579.     ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));
  1580.     if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) 
  1581.       {
  1582. if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {
  1583.   *tab = tabPtr;
  1584.   return true;
  1585. }
  1586.       }
  1587.     LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);
  1588.     
  1589.     fragBuf.position(fragIt, m_currentFragment);
  1590.     for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){
  1591.       FragmentDescriptor tmp;
  1592.       tmp.m_dummy = * fragIt.data;
  1593.       if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
  1594. * fd = tmp;
  1595. * tab = tabPtr;
  1596. return true;
  1597.       }
  1598.     }
  1599.     m_currentFragment = 0;
  1600.   }
  1601.   return false;
  1602. }
  1603. void
  1604. SumaParticipant::SyncRecord::nextScan(Signal* signal){
  1605.   jam();
  1606.   TablePtr tabPtr;
  1607.   FragmentDescriptor fd;
  1608.   SubscriptionPtr subPtr;
  1609.   if(!getNextFragment(&tabPtr, &fd)){
  1610.     jam();
  1611.     completeScan(signal);
  1612.     return;
  1613.   }
  1614.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1615.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  1616.  
  1617.   if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) {
  1618.     jam();
  1619.     if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {
  1620.       /*
  1621.        * table is not part of the subscription. Check next table
  1622.        */
  1623.       m_currentTable++;
  1624.       nextScan(signal);
  1625.       return;
  1626.     }
  1627.   }
  1628.   DataBuffer<15>::Head head = m_attributeList;
  1629.   if(head.getSize() == 0){
  1630.     head = tabPtr.p->m_attributes;
  1631.   }
  1632.   LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
  1633.   
  1634.   ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
  1635.   const Uint32 parallelism = 16;
  1636.   const Uint32 attrLen = 5 + attrBuf.getSize();
  1637.   req->senderData = m_subscriptionPtrI;
  1638.   req->resultRef = suma.reference();
  1639.   req->tableId = tabPtr.p->m_tableId;
  1640.   req->requestInfo = 0;
  1641.   req->savePointId = 0;
  1642.   ScanFragReq::setLockMode(req->requestInfo, 0);
  1643.   ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
  1644.   ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
  1645.   ScanFragReq::setAttrLen(req->requestInfo, attrLen);
  1646.   req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
  1647.   req->schemaVersion = tabPtr.p->m_schemaVersion;
  1648.   req->transId1 = 0;
  1649.   req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
  1650.   req->clientOpPtr = (ptrI << 16);
  1651.   req->batch_size_rows= 16;
  1652.   req->batch_size_bytes= 0;
  1653.   suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 
  1654.   ScanFragReq::SignalLength, JBB);
  1655.   
  1656.   signal->theData[0] = ptrI;
  1657.   signal->theData[1] = 0;
  1658.   signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);
  1659.   
  1660.   // Return all
  1661.   signal->theData[3] = attrBuf.getSize();
  1662.   signal->theData[4] = 0;
  1663.   signal->theData[5] = 0;
  1664.   signal->theData[6] = 0;
  1665.   signal->theData[7] = 0;
  1666.   
  1667.   Uint32 dataPos = 8;
  1668.   DataBuffer<15>::DataBufferIterator it;
  1669.   for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
  1670.     AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);
  1671.     if(dataPos == 25){
  1672.       suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB);
  1673. dataPos = 3;
  1674.     }
  1675.   }
  1676.   if(dataPos != 3){
  1677.     suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);
  1678.   }
  1679.   
  1680.   m_currentTableId = tabPtr.p->m_tableId;
  1681.   m_currentNoOfAttributes = attrBuf.getSize();        
  1682. }
  1683. void
  1684. SumaParticipant::execSCAN_FRAGREF(Signal* signal){
  1685.   jamEntry();
  1686. //  ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
  1687.   ndbrequire(false);
  1688. }
  1689. void
  1690. SumaParticipant::execSCAN_FRAGCONF(Signal* signal){
  1691.   jamEntry();
  1692.   CRASH_INSERTION(13011);
  1693.   ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
  1694.   
  1695.   const Uint32 completed = conf->fragmentCompleted;
  1696.   const Uint32 senderData = conf->senderData;
  1697.   const Uint32 completedOps = conf->completedOps;
  1698.   SubscriptionPtr subPtr;
  1699.   c_subscriptions.getPtr(subPtr, senderData);
  1700.   
  1701.   if(completed != 2){
  1702.     jam();
  1703.     
  1704. #if PRINT_ONLY
  1705.     SubSyncContinueConf * const conf = 
  1706.       (SubSyncContinueConf*)signal->getDataPtrSend();  
  1707.     conf->subscriptionId = subPtr.p->m_subscriptionId;
  1708.     conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  1709.     execSUB_SYNC_CONTINUE_CONF(signal);
  1710. #else
  1711.     SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
  1712.     req->subscriberData = subPtr.p->m_subscriberData;
  1713.     req->noOfRowsSent = completedOps;
  1714.     sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
  1715.        SubSyncContinueReq::SignalLength, JBB);
  1716. #endif
  1717.     return;
  1718.   }
  1719.   ndbrequire(completedOps == 0);
  1720.   
  1721.   SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
  1722.   
  1723.   tmp->m_currentFragment++;
  1724.   tmp->nextScan(signal);
  1725. }
  1726. void
  1727. SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
  1728.   jamEntry();
  1729.   
  1730.   CRASH_INSERTION(13012);
  1731.   SubSyncContinueConf * const conf = 
  1732.     (SubSyncContinueConf*)signal->getDataPtr();  
  1733.   
  1734.   SubscriptionPtr subPtr;
  1735.   Subscription key; 
  1736.   key.m_subscriptionId = conf->subscriptionId;
  1737.   key.m_subscriptionKey = conf->subscriptionKey;
  1738.   
  1739.   ndbrequire(c_subscriptions.find(subPtr, key));
  1740.   ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
  1741.   req->senderData = subPtr.i;
  1742.   req->closeFlag = 0;
  1743.   req->transId1 = 0;
  1744.   req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
  1745.   req->batch_size_rows = 16;
  1746.   req->batch_size_bytes = 0;
  1747.   sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 
  1748.      ScanFragNextReq::SignalLength, JBB);
  1749. }
  1750. void
  1751. SumaParticipant::SyncRecord::completeScan(Signal* signal){
  1752.   jam();
  1753.   //  m_tableList.release();
  1754.   SubscriptionPtr subPtr;
  1755.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1756.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  1757.   
  1758. #if PRINT_ONLY
  1759.   ndbout_c("GSN_SUB_SYNC_CONF (data)");
  1760. #else
  1761.   SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
  1762.   conf->subscriptionId = subPtr.p->m_subscriptionId;
  1763.   conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  1764.   conf->part = SubscriptionData::TableData;
  1765.   conf->subscriberData = subPtr.p->m_subscriberData;
  1766.   suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
  1767.   SubSyncConf::SignalLength, JBB);
  1768. #endif
  1769. }
  1770. void
  1771. SumaParticipant::execSCAN_HBREP(Signal* signal){
  1772.   jamEntry();
  1773. #if 0
  1774.   ndbout << "execSCAN_HBREP" << endl << hex;
  1775.   for(int i = 0; i<signal->length(); i++){
  1776.     ndbout << signal->theData[i] << " ";
  1777.     if(((i + 1) % 8) == 0)
  1778.       ndbout << endl << hex;
  1779.   }
  1780.   ndbout << endl;
  1781. #endif
  1782. }
  1783. /**********************************************************
  1784.  *
  1785.  * Suma participant interface
  1786.  *
  1787.  * Creation of subscriber
  1788.  *
  1789.  */
  1790. void
  1791. SumaParticipant::execSUB_START_REQ(Signal* signal){
  1792.   jamEntry();
  1793. #ifdef NODEFAIL_DEBUG
  1794.   ndbout_c("Suma::execSUB_START_REQ");
  1795. #endif
  1796.   CRASH_INSERTION(13013);
  1797.   if (c_restartLock) {
  1798.     jam();
  1799.     //    ndbout_c("c_restartLock");
  1800.     if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
  1801.       jam();
  1802.       sendSubStartRef(signal, /** Error Code */ 0, true);
  1803.       return;
  1804.     }
  1805.     // only allow other Suma's in the nodegroup to come through for restart purposes
  1806.   }
  1807.   Subscription key; 
  1808.   SubStartReq * const req = (SubStartReq*)signal->getDataPtr();
  1809.   Uint32 senderRef            = req->senderRef;
  1810.   Uint32 senderData           = req->senderData;
  1811.   Uint32 subscriberData       = req->subscriberData;
  1812.   Uint32 subscriberRef        = req->subscriberRef;
  1813.   SubscriptionData::Part part = (SubscriptionData::Part)req->part;
  1814.   key.m_subscriptionId        = req->subscriptionId;
  1815.   key.m_subscriptionKey       = req->subscriptionKey;
  1816.   SubscriptionPtr subPtr;
  1817.   if(!c_subscriptions.find(subPtr, key)){
  1818.     jam();
  1819.     sendSubStartRef(signal, /** Error Code */ 0);
  1820.     return;
  1821.   }
  1822.   
  1823.   Ptr<SyncRecord> syncPtr;
  1824.   c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  1825.   if (syncPtr.p->m_locked) {
  1826.     jam();
  1827. #if 0
  1828.     ndbout_c("Locked");
  1829. #endif
  1830.     sendSubStartRef(signal, /** Error Code */ 0, true);
  1831.     return;
  1832.   }
  1833.   syncPtr.p->m_locked = true;
  1834.   SubscriberPtr subbPtr;
  1835.   if(!c_subscriberPool.seize(subbPtr)){
  1836.     jam();
  1837.     syncPtr.p->m_locked = false;
  1838.     sendSubStartRef(signal, /** Error Code */ 0);
  1839.     return;
  1840.   }
  1841.   Uint32 type = subPtr.p->m_subscriptionType;
  1842.   subbPtr.p->m_senderRef  = senderRef;
  1843.   subbPtr.p->m_senderData = senderData;
  1844.   switch (type) {
  1845.   case SubCreateReq::TableEvent:
  1846.     jam();
  1847.     // we want the data to return to the API not DICT
  1848.     subbPtr.p->m_subscriberRef = subscriberRef;
  1849.     //    ndbout_c("start ref = %u", signal->getSendersBlockRef());
  1850.     //    ndbout_c("ref = %u", subbPtr.p->m_subscriberRef);
  1851.     // we use the subscription id for now, should really be API choice
  1852.     subbPtr.p->m_subscriberData = subscriberData;
  1853. #if 0
  1854.     if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
  1855.       jam();
  1856.       for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  1857. Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
  1858. if (ref != reference()) {
  1859.   jam();
  1860.   sendSubStartReq(subPtr, subbPtr, signal, ref);
  1861. } else
  1862.   jam();
  1863.       }
  1864.     }
  1865. #endif
  1866.     break;
  1867.   case SubCreateReq::DatabaseSnapshot:
  1868.   case SubCreateReq::SelectiveTableSnapshot:
  1869.     jam();
  1870.     ndbrequire(false);
  1871.     //subbPtr.p->m_subscriberRef = GREP_REF;
  1872.     subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
  1873.     break;
  1874.   case SubCreateReq::SingleTableScan:
  1875.     jam();
  1876.     subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef;
  1877.     subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
  1878.   }
  1879.   
  1880.   subbPtr.p->m_subPtrI = subPtr.i;
  1881.   subbPtr.p->m_firstGCI = RNIL;
  1882.   if (type == SubCreateReq::TableEvent)
  1883.     subbPtr.p->m_lastGCI = 0;
  1884.   else
  1885.     subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI
  1886.   bool ok = false;
  1887.   
  1888.   switch(part){
  1889.   case SubscriptionData::MetaData:
  1890.     ok = true;
  1891.     jam();
  1892.     c_metaSubscribers.add(subbPtr);
  1893.     sendSubStartComplete(signal, subbPtr, 0, part);
  1894.     break;
  1895.   case SubscriptionData::TableData: 
  1896.     ok = true;
  1897.     jam();
  1898.     c_prepDataSubscribers.add(subbPtr);
  1899.     syncPtr.p->startTrigger(signal);
  1900.     break;
  1901.   }
  1902.   ndbrequire(ok);
  1903. }
  1904. void
  1905. SumaParticipant::sendSubStartComplete(Signal* signal,
  1906.       SubscriberPtr subbPtr, 
  1907.       Uint32 firstGCI,
  1908.       SubscriptionData::Part part){
  1909.   jam();
  1910.   SubscriptionPtr subPtr;
  1911.   c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  1912.   Ptr<SyncRecord> syncPtr;
  1913.   c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  1914.   syncPtr.p->m_locked = false;
  1915.   SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();    
  1916.   
  1917.   conf->senderRef       = reference();
  1918.   conf->senderData      = subbPtr.p->m_senderData;
  1919.   conf->subscriptionId  = subPtr.p->m_subscriptionId;
  1920.   conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  1921.   conf->firstGCI = firstGCI;
  1922.   conf->part = (Uint32) part;
  1923.   
  1924.   conf->subscriberData = subPtr.p->m_subscriberData;
  1925.   sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal,
  1926.      SubStartConf::SignalLength, JBB);
  1927. }
  1928. #if 0
  1929. void
  1930. SumaParticipant::sendSubStartRef(SubscriptionPtr subPtr,
  1931.  Signal* signal, Uint32 errCode,
  1932.  bool temporary){
  1933.   jam();
  1934.   SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
  1935.   xxx ref->senderRef       = reference();
  1936.   xxx ref->senderData      = subPtr.p->m_senderData;
  1937.   ref->subscriptionId  = subPtr.p->m_subscriptionId;
  1938.   ref->subscriptionKey = subPtr.p->m_subscriptionKey;
  1939.   ref->part            = (Uint32) subPtr.p->m_subscriptionType;
  1940.   ref->subscriberData  = subPtr.p->m_subscriberData;
  1941.   ref->err             = errCode;
  1942.   if (temporary) {
  1943.     jam();
  1944.     ref->setTemporary();
  1945.   }
  1946.   releaseSections(signal);
  1947.   sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal, 
  1948.      SubStartRef::SignalLength, JBB);
  1949. }
  1950. #endif
  1951. void
  1952. SumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode,
  1953.  bool temporary){
  1954.   jam();
  1955.   SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
  1956.   ref->senderRef  = reference();
  1957.   ref->err = errCode;
  1958.   if (temporary) {
  1959.     jam();
  1960.     ref->setTemporary();
  1961.   }
  1962.   releaseSections(signal);
  1963.   sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, 
  1964.      SubStartRef::SignalLength, JBB);
  1965. }
  1966. /**********************************************************
  1967.  *
  1968.  * Trigger admin interface
  1969.  *
  1970.  */
  1971. void
  1972. SumaParticipant::SyncRecord::startTrigger(Signal* signal){
  1973.   jam();
  1974.   m_currentTable = 0;
  1975.   m_latestTriggerId = RNIL;
  1976.   nextTrigger(signal);
  1977. }
  1978. void
  1979. SumaParticipant::SyncRecord::nextTrigger(Signal* signal){
  1980.   jam();
  1981.   TableList::DataBufferIterator it;
  1982.   
  1983.   if(!m_tableList.position(it, m_currentTable)){
  1984.     completeTrigger(signal);
  1985.     return;
  1986.   }
  1987.   SubscriptionPtr subPtr;
  1988.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  1989.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  1990.   const Uint32 RT_BREAK = 48;
  1991.   Uint32 latestTriggerId = 0;
  1992.   for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){   
  1993.     TablePtr tabPtr;
  1994. #if 0
  1995.     ndbout_c("nextTrigger tableid %u", *it.data);
  1996. #endif
  1997.     ndbrequire(suma.c_tables.find(tabPtr, *it.data));
  1998.     AttributeMask attrMask;
  1999.     createAttributeMask(attrMask, tabPtr.p);
  2000.     for(Uint32 j = 0; j<3; j++){
  2001.       i++;
  2002.       latestTriggerId = (tabPtr.p->m_schemaVersion << 18) |
  2003. (j << 16) | tabPtr.p->m_tableId;
  2004.       if(tabPtr.p->m_hasTriggerDefined[j] == 0) {
  2005. ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
  2006. #if 0
  2007. ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j);
  2008. #endif
  2009. CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
  2010. req->setUserRef(SUMA_REF);
  2011. req->setConnectionPtr(ptrI);
  2012. req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
  2013. req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
  2014. req->setMonitorReplicas(true);
  2015. req->setMonitorAllAttributes(false);
  2016. req->setReceiverRef(SUMA_REF);
  2017. req->setTriggerId(latestTriggerId);
  2018. req->setTriggerEvent((TriggerEvent::Value)j);
  2019. req->setTableId(tabPtr.p->m_tableId);
  2020. req->setAttributeMask(attrMask);
  2021. suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 
  2022. signal, CreateTrigReq::SignalLength, JBB);
  2023.       } else {
  2024. /**
  2025.  * Faking that a trigger has been created in order to
  2026.  * simulate the proper behaviour.
  2027.  * Perhaps this should be a dummy signal instead of 
  2028.  * (ab)using CREATE_TRIG_CONF.
  2029.  */ 
  2030. CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend();
  2031. conf->setConnectionPtr(ptrI);
  2032. conf->setTableId(tabPtr.p->m_tableId);
  2033. conf->setTriggerId(latestTriggerId);
  2034. suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF,
  2035. signal, CreateTrigConf::SignalLength, JBB);
  2036.   
  2037.       }
  2038.     }
  2039.     m_currentTable++;
  2040.   }
  2041.   m_latestTriggerId = latestTriggerId;
  2042. }
  2043. void
  2044. SumaParticipant::SyncRecord::createAttributeMask(AttributeMask& mask, 
  2045.  Table * table){
  2046.   jam();
  2047.   mask.clear();
  2048.   DataBuffer<15>::DataBufferIterator it;
  2049.   LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes);
  2050.   for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
  2051.     mask.set(* it.data);
  2052.   }
  2053. }
  2054. void
  2055. SumaParticipant::SyncRecord::runCREATE_TRIG_CONF(Signal* signal){
  2056.   jam();
  2057.   
  2058.   CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
  2059.   const Uint32 triggerId = conf->getTriggerId();
  2060.   Uint32 type = (triggerId >> 16) & 0x3;
  2061.   Uint32 tableId = conf->getTableId();
  2062.   
  2063.   TablePtr tabPtr;
  2064.   ndbrequire(suma.c_tables.find(tabPtr, tableId));
  2065.   ndbrequire(type < 3);
  2066.   tabPtr.p->m_triggerIds[type] = triggerId;
  2067.   tabPtr.p->m_hasTriggerDefined[type]++;
  2068.   if(triggerId == m_latestTriggerId){
  2069.     jam();
  2070.     nextTrigger(signal);
  2071.   }
  2072. }
  2073. void
  2074. SumaParticipant::SyncRecord::completeTrigger(Signal* signal){
  2075.   jam();
  2076.   SubscriptionPtr subPtr;
  2077.   CRASH_INSERTION(13013);
  2078. #ifdef EVENT_PH3_DEBUG
  2079.   ndbout_c("SumaParticipant: trigger completed");
  2080. #endif
  2081.   Uint32 gci;
  2082.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  2083.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  2084.   SubscriberPtr subbPtr;
  2085.   {
  2086.     bool found = false;
  2087.     for(suma.c_prepDataSubscribers.first(subbPtr);
  2088. !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
  2089.       jam();
  2090.       if(subbPtr.p->m_subPtrI == subPtr.i) {
  2091. jam();
  2092. found = true;
  2093. break;
  2094.       }
  2095.     }
  2096.     ndbrequire(found);
  2097.     gci = suma.getFirstGCI(signal);
  2098.     subbPtr.p->m_firstGCI = gci;
  2099.     suma.c_prepDataSubscribers.remove(subbPtr);
  2100.     suma.c_dataSubscribers.add(subbPtr);
  2101.   }
  2102.   suma.sendSubStartComplete(signal, subbPtr, gci,  SubscriptionData::TableData);
  2103. }
  2104. void
  2105. SumaParticipant::SyncRecord::startDropTrigger(Signal* signal){
  2106.   jam();
  2107.   m_currentTable = 0;
  2108.   m_latestTriggerId = RNIL;
  2109.   nextDropTrigger(signal);
  2110. }
  2111. void
  2112. SumaParticipant::SyncRecord::nextDropTrigger(Signal* signal){
  2113.   jam();
  2114.   TableList::DataBufferIterator it;
  2115.   
  2116.   if(!m_tableList.position(it, m_currentTable)){
  2117.     completeDropTrigger(signal);
  2118.     return;
  2119.   }
  2120.   SubscriptionPtr subPtr;
  2121.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  2122.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  2123.   const Uint32 RT_BREAK = 48;
  2124.   Uint32 latestTriggerId = 0;
  2125.   for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){
  2126.     jam();
  2127.     TablePtr tabPtr;
  2128. #if 0
  2129.     ndbout_c("nextDropTrigger tableid %u", *it.data);
  2130. #endif
  2131.     ndbrequire(suma.c_tables.find(tabPtr, * it.data));
  2132.     for(Uint32 j = 0; j<3; j++){
  2133.       jam();
  2134.       ndbrequire(tabPtr.p->m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
  2135.       i++;
  2136.       latestTriggerId = tabPtr.p->m_triggerIds[j];
  2137.       if(tabPtr.p->m_hasTriggerDefined[j] == 1) {
  2138. jam();
  2139. DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
  2140. req->setConnectionPtr(ptrI);
  2141. req->setUserRef(SUMA_REF); // Sending to myself
  2142. req->setRequestType(DropTrigReq::RT_USER);
  2143. req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
  2144. req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
  2145. req->setIndexId(RNIL);
  2146. req->setTableId(tabPtr.p->m_tableId);
  2147. req->setTriggerId(latestTriggerId);
  2148. req->setTriggerEvent((TriggerEvent::Value)j);
  2149. #if 0
  2150. ndbout_c("DROPPING trigger %u = %u %u %u on table %u[%u]",
  2151.  latestTriggerId,TriggerType::SUBSCRIPTION_BEFORE,
  2152.  TriggerActionTime::TA_DETACHED, j, tabPtr.p->m_tableId, j);
  2153. #endif
  2154. suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
  2155. signal, DropTrigReq::SignalLength, JBB);
  2156.       } else {
  2157. jam();
  2158. ndbrequire(tabPtr.p->m_hasTriggerDefined[j] > 1);
  2159. /**
  2160.  * Faking that a trigger has been dropped in order to
  2161.  * simulate the proper behaviour.
  2162.  * Perhaps this should be a dummy signal instead of 
  2163.  * (ab)using DROP_TRIG_CONF.
  2164.  */ 
  2165. DropTrigConf * conf = (DropTrigConf*)signal->getDataPtrSend();
  2166. conf->setConnectionPtr(ptrI);
  2167. conf->setTableId(tabPtr.p->m_tableId);
  2168. conf->setTriggerId(latestTriggerId);
  2169. suma.sendSignal(SUMA_REF,GSN_DROP_TRIG_CONF,
  2170. signal, DropTrigConf::SignalLength, JBB);
  2171.       }
  2172.     }
  2173.     m_currentTable++;
  2174.   }
  2175.   m_latestTriggerId = latestTriggerId;
  2176. }
  2177. void
  2178. SumaParticipant::SyncRecord::runDROP_TRIG_REF(Signal* signal){
  2179.   jam();
  2180.   DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
  2181.   if (ref->getErrorCode() != DropTrigRef::TriggerNotFound){
  2182.     ndbrequire(false);
  2183.   }
  2184.   const Uint32 triggerId = ref->getTriggerId();
  2185.   Uint32 tableId = ref->getTableId();
  2186.   runDropTrig(signal, triggerId, tableId);
  2187. }
  2188. void
  2189. SumaParticipant::SyncRecord::runDROP_TRIG_CONF(Signal* signal){
  2190.   jam();
  2191.   
  2192.   DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
  2193.   const Uint32 triggerId = conf->getTriggerId();
  2194.   Uint32 tableId = conf->getTableId();
  2195.   runDropTrig(signal, triggerId, tableId);
  2196. }
  2197. void
  2198. SumaParticipant::SyncRecord::runDropTrig(Signal* signal,
  2199.  Uint32 triggerId,
  2200.  Uint32 tableId){
  2201.   Uint32 type = (triggerId >> 16) & 0x3;
  2202.   
  2203.   TablePtr tabPtr;
  2204.   ndbrequire(suma.c_tables.find(tabPtr, tableId));
  2205.   ndbrequire(type < 3);
  2206.   ndbrequire(tabPtr.p->m_triggerIds[type] == triggerId);
  2207.   tabPtr.p->m_hasTriggerDefined[type]--;
  2208.   if (tabPtr.p->m_hasTriggerDefined[type] == 0) {
  2209.     jam();
  2210.     tabPtr.p->m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
  2211.   }
  2212.   if(triggerId == m_latestTriggerId){
  2213.     jam();
  2214.     nextDropTrigger(signal);
  2215.   }
  2216. }
  2217. void
  2218. SumaParticipant::SyncRecord::completeDropTrigger(Signal* signal){
  2219.   jam();
  2220.   SubscriptionPtr subPtr;
  2221.   CRASH_INSERTION(13014);
  2222. #if 0
  2223.   ndbout_c("trigger completed");
  2224. #endif
  2225.   suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  2226.   ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  2227.   bool found = false;
  2228.   SubscriberPtr subbPtr;
  2229.   for(suma.c_prepDataSubscribers.first(subbPtr);
  2230.       !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
  2231.     jam();
  2232.     if(subbPtr.p->m_subPtrI == subPtr.i) {
  2233.       jam();
  2234.       found = true;
  2235.       break;
  2236.     }
  2237.   }
  2238.   ndbrequire(found);
  2239.   suma.sendSubStopComplete(signal, subbPtr);
  2240. }
  2241. /**********************************************************
  2242.  * Scan data interface
  2243.  *
  2244.  * Assumption: one execTRANSID_AI contains all attr info
  2245.  *
  2246.  */
  2247. #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
  2248. #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
  2249. static Uint32 f_bufferLock = 0;
  2250. static Uint32 f_buffer[SUMA_BUF_SZ];
  2251. static Uint32 f_trigBufferSize = 0;
  2252. static Uint32 b_bufferLock = 0;
  2253. static Uint32 b_buffer[SUMA_BUF_SZ];
  2254. static Uint32 b_trigBufferSize = 0;
  2255. void
  2256. SumaParticipant::execTRANSID_AI(Signal* signal){
  2257.   jamEntry();
  2258.   CRASH_INSERTION(13015);
  2259.   TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
  2260.   const Uint32 opPtrI = data->connectPtr;
  2261.   const Uint32 length = signal->length() - 3;
  2262.   if(f_bufferLock == 0){
  2263.     f_bufferLock = opPtrI;
  2264.   } else {
  2265.     ndbrequire(f_bufferLock == opPtrI);
  2266.   }
  2267.   
  2268.   Ptr<SyncRecord> syncPtr;
  2269.   c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
  2270.   
  2271.   Uint32 sum = 0;
  2272.   Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
  2273.   Uint32 * headers = f_buffer;
  2274.   const Uint32 * src = &data->attrData[0];
  2275.   const Uint32 * const end = &src[length];
  2276.   
  2277.   const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
  2278.   for(Uint32 i = 0; i<attribs; i++){
  2279.     Uint32 tmp = * src++;
  2280.     * headers++ = tmp;
  2281.     Uint32 len = AttributeHeader::getDataSize(tmp);
  2282.     
  2283.     memcpy(dst, src, 4 * len);
  2284.     dst += len;
  2285.     src += len;
  2286.     sum += len;
  2287.   }
  2288.   
  2289.   ndbrequire(src == end);
  2290.   /**
  2291.    * Send data to subscriber
  2292.    */
  2293.   LinearSectionPtr ptr[3];
  2294.   ptr[0].p = f_buffer;
  2295.   ptr[0].sz = attribs;
  2296.   
  2297.   ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
  2298.   ptr[1].sz = sum;
  2299.   SubscriptionPtr subPtr;
  2300.   c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
  2301.   
  2302.   /**
  2303.    * Initialize signal
  2304.    */  
  2305.   SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
  2306.   Uint32 ref = subPtr.p->m_subscriberRef;
  2307.   sdata->tableId = syncPtr.p->m_currentTableId;
  2308.   sdata->senderData = subPtr.p->m_subscriberData;
  2309.   sdata->operation = 3; // Scan
  2310.   sdata->gci = 1; // Undefined
  2311. #if PRINT_ONLY
  2312.   ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
  2313. #else
  2314.   sendSignal(ref,
  2315.      GSN_SUB_TABLE_DATA,
  2316.      signal, 
  2317.      SubTableData::SignalLength, JBB,
  2318.      ptr, 2);
  2319. #endif
  2320.   
  2321.   /**
  2322.    * Reset f_bufferLock
  2323.    */
  2324.   f_bufferLock = 0;
  2325. }
  2326. /**********************************************************
  2327.  *
  2328.  * Trigger data interface
  2329.  *
  2330.  */
  2331. void
  2332. SumaParticipant::execTRIG_ATTRINFO(Signal* signal){
  2333.   jamEntry();
  2334.   
  2335.   CRASH_INSERTION(13016);
  2336.   TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
  2337.   const Uint32 trigId = trg->getTriggerId();
  2338.   const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
  2339.   if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
  2340.     jam();
  2341.     ndbrequire(b_bufferLock == trigId);
  2342.     memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
  2343.     b_trigBufferSize += dataLen;
  2344.     // printf("before values %u %u %un",trigId, dataLen,  b_trigBufferSize);
  2345.   } else {
  2346.     jam();
  2347.     if(f_bufferLock == 0){
  2348.       f_bufferLock = trigId;
  2349.       f_trigBufferSize = 0;
  2350.       b_bufferLock = trigId;
  2351.       b_trigBufferSize = 0;
  2352.     } else {
  2353.       ndbrequire(f_bufferLock == trigId);
  2354.     }
  2355.     memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
  2356.     f_trigBufferSize += dataLen;
  2357.   }
  2358. }
  2359. #ifdef NODEFAIL_DEBUG2
  2360. static int theCounts[64] = {0};
  2361. #endif
  2362. Uint32 
  2363. Suma::getStoreBucket(Uint32 v)
  2364. {
  2365.   // id will contain id to responsible suma or 
  2366.   // RNIL if we don't have nodegroup info yet
  2367.   const Uint32 N = NO_OF_BUCKETS;
  2368.   const Uint32 D = v % N;            // Distibution key
  2369.   return D;
  2370. }
  2371. Uint32 
  2372. Suma::getResponsibleSumaNodeId(Uint32 D)
  2373. {
  2374.   // id will contain id to responsible suma or 
  2375.   // RNIL if we don't have nodegroup info yet
  2376.   Uint32 id;
  2377.   if (c_restartLock) {
  2378.     jam();
  2379.     //    ndbout_c("c_restartLock");
  2380.     id = RNIL;
  2381.   } else {
  2382.     jam();
  2383.     id = RNIL;
  2384.     const Uint32 n = c_noNodesInGroup; // Number nodes in node group
  2385.     const Uint32 C1 = D / n;
  2386.     const Uint32 C2 = D - C1*n; // = D % n;
  2387.     const Uint32 C = C2 + C1 % n;
  2388.     for (Uint32 i = 0; i < n; i++) {
  2389.       jam();
  2390.       id = c_nodesInGroup[(C + i) % n];
  2391.       if (c_aliveNodes.get(id) &&
  2392.   !c_preparingNodes.get(id)) {
  2393.         jam();
  2394. break;
  2395.       }//if
  2396.     }
  2397. #ifdef NODEFAIL_DEBUG2
  2398.     theCounts[id]++;
  2399.     ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
  2400.      n,D, id, theCounts[id]);
  2401. #endif
  2402.   }
  2403.   return id;
  2404. }
  2405. Uint32
  2406. SumaParticipant::decideWhoToSend(Uint32 nBucket, Uint32 gci){
  2407.   bool replicaFlag = true;
  2408.   Uint32 nId = RNIL;
  2409.   // bucket active/not active set by GCP_COMPLETE
  2410.   if (c_buckets[nBucket].active) {
  2411.     if (c_buckets[nBucket].handover && c_buckets[nBucket].handoverGCI <= gci) {
  2412.       jam();
  2413.       replicaFlag = true; // let the other node send this
  2414.       nId = RNIL;
  2415.       // mark this as started, if we get a node failiure now we have some lost stuff
  2416.       c_buckets[nBucket].handover_started = true;
  2417.     } else {
  2418.       jam();
  2419.       replicaFlag = false;
  2420.       nId = refToNode(reference());
  2421.     }
  2422.   } else {
  2423.     nId  = getResponsibleSumaNodeId(nBucket);
  2424.     replicaFlag = !(nId == refToNode(reference()));
  2425.     
  2426.     if (!replicaFlag) {
  2427.       if (!c_buckets[nBucket].handover) {
  2428. jam();
  2429. // appearently a node has failed and we are taking over sending
  2430. // from that bucket.  Now we need to go back to latest completed
  2431. // GCI.  Handling will depend on Subscriber and Subscription
  2432. // TODO, for now we make an easy takeover
  2433. if (gci < c_nodeFailGCI)
  2434.   c_lastInconsistentGCI = gci;
  2435. // we now have responsability for this bucket and we're actively
  2436. // sending from that
  2437. c_buckets[nBucket].active = true;
  2438. #ifdef HANDOVER_DEBUG
  2439. ndbout_c("Takeover Bucket %u", nBucket);
  2440. #endif
  2441.       } else if (c_buckets[nBucket].handoverGCI > gci) {
  2442. jam();
  2443. replicaFlag = true; // handover going on, but don't start sending yet
  2444. nId = RNIL;
  2445.       } else {
  2446. jam();
  2447. #ifdef HANDOVER_DEBUG
  2448. ndbout_c("Possible error: Will send from GCI = %u", gci);
  2449. #endif
  2450. }
  2451.     }
  2452.   }
  2453.   
  2454. #ifdef NODEFAIL_DEBUG2
  2455.   ndbout_c("Suma:bucket %u, responsible id = %u, replicaFlag = %u",
  2456.    nBucket, nId, (Uint32)replicaFlag);
  2457. #endif
  2458.   return replicaFlag;
  2459. }
  2460. void
  2461. SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
  2462.   jamEntry();
  2463.   CRASH_INSERTION(13016);
  2464.   FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
  2465.   const Uint32 trigId    = trg->getTriggerId();
  2466.   const Uint32 hashValue = trg->getHashValue();
  2467.   const Uint32 gci       = trg->getGCI();
  2468.   const Uint32 event     = trg->getTriggerEvent();
  2469.   const Uint32 triggerId = trg->getTriggerId();
  2470.   Uint32 tableId         = triggerId & 0xFFFF;
  2471.   ndbrequire(f_bufferLock == trigId);
  2472.   
  2473. #ifdef EVENT_DEBUG2
  2474.   ndbout_c("SumaParticipant::execFIRE_TRIG_ORD");
  2475. #endif
  2476.   Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
  2477.   ndbrequire(sz == f_trigBufferSize);
  2478.   /**
  2479.    * Reformat as "all headers" + "all data"
  2480.    */
  2481.   Uint32 dataLen   = 0;
  2482.   Uint32 noOfAttrs = 0;
  2483.   Uint32 * src     = f_buffer;
  2484.   Uint32 * headers = signal->theData + 25;
  2485.   Uint32 * dst     = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
  2486.   LinearSectionPtr ptr[3];
  2487.   int nptr;
  2488.   ptr[0].p  = headers;
  2489.   ptr[1].p  = dst;
  2490.   while(sz > 0){
  2491.     jam();
  2492.     Uint32 tmp = * src ++;
  2493.     * headers ++ = tmp;
  2494.     Uint32 len = AttributeHeader::getDataSize(tmp);
  2495.     memcpy(dst, src, 4 * len);
  2496.     dst += len;
  2497.     src += len;
  2498.     
  2499.     noOfAttrs++;
  2500.     dataLen += len;
  2501.     sz -= (1 + len);
  2502.   }
  2503.   ndbrequire(sz == 0);
  2504.   ptr[0].sz = noOfAttrs;
  2505.   ptr[1].sz = dataLen;
  2506.   if (b_trigBufferSize > 0) {
  2507.     jam();
  2508.     ptr[2].p  = b_buffer;
  2509.     ptr[2].sz = b_trigBufferSize;
  2510.     nptr = 3;
  2511.   } else {
  2512.     jam();
  2513.     nptr = 2;
  2514.   }
  2515.   // right now only for tableEvent
  2516.   bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);
  2517.   /**
  2518.    * Signal to subscriber(s)
  2519.    */
  2520.   SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
  2521.   data->gci            = gci;
  2522.   data->tableId        = tableId;
  2523.   data->operation      = event;
  2524.   data->noOfAttributes = noOfAttrs;
  2525.   data->dataSize       =  dataLen;
  2526.   SubscriberPtr subbPtr;
  2527.   for(c_dataSubscribers.first(subbPtr); !subbPtr.isNull();
  2528.       c_dataSubscribers.next(subbPtr)){
  2529.     if (subbPtr.p->m_firstGCI > gci) {
  2530. #ifdef EVENT_DEBUG
  2531.       ndbout_c("m_firstGCI = %u, gci = %u", subbPtr.p->m_firstGCI, gci);
  2532. #endif
  2533.       jam();
  2534.       // we're either restarting or it's a newly created subscriber
  2535.       // and waiting for the right gci
  2536.       continue;
  2537.     }
  2538.     jam();
  2539.     const Uint32 ref = subbPtr.p->m_subscriberRef;
  2540.     //    ndbout_c("ref = %u", ref);
  2541.     const Uint32 subdata = subbPtr.p->m_subscriberData;
  2542.     data->senderData = subdata;
  2543.     /*
  2544.      * get subscription ptr for this subscriber
  2545.      */
  2546.     SubscriptionPtr subPtr;
  2547.     c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  2548.     if(!subPtr.p->m_tables[tableId]) {
  2549.       jam();
  2550.       continue;
  2551.       //continue in for-loop if the table is not part of 
  2552.       //the subscription. Otherwise, send data to subscriber.
  2553.     }
  2554.    
  2555.     if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
  2556.       if (replicaFlag) {
  2557. jam();
  2558. c_failoverBuffer.subTableData(gci,NULL,0);
  2559. continue;
  2560.       }
  2561.       jam();
  2562.       Uint32 tmp = data->logType;
  2563.       if (c_lastInconsistentGCI == data->gci) {
  2564. data->setGCINotConsistent();
  2565.       }
  2566. #ifdef HANDOVER_DEBUG
  2567.       {
  2568. static int aLongGCIName = 0;
  2569. if (data->gci != aLongGCIName) {
  2570.   aLongGCIName = data->gci;
  2571.   ndbout_c("sent from GCI = %u", aLongGCIName);
  2572. }
  2573.       }
  2574. #endif
  2575.       sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
  2576.                  SubTableData::SignalLength, JBB, ptr, nptr);
  2577.       data->logType = tmp;
  2578.     } else {
  2579.       ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
  2580.       jam();
  2581. #if PRINT_ONLY
  2582.       ndbout_c("GSN_SUB_TABLE_DATA to %s: op: %d #attr: %d len: %d",
  2583.        getBlockName(refToBlock(ref)), 
  2584.        noOfAttrs, dataLen);
  2585.     
  2586. #else
  2587. #ifdef HANDOVER_DEBUG
  2588.       {
  2589. static int aLongGCIName2 = 0;
  2590. if (data->gci != aLongGCIName2) {
  2591.   aLongGCIName2 = data->gci;
  2592.   ndbout_c("(EXECUTE_DIRECT) sent from GCI = %u to %u", aLongGCIName2, ref);
  2593. }
  2594.       }
  2595. #endif
  2596.       EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_TABLE_DATA, signal,
  2597.      SubTableData::SignalLength);  
  2598.       jamEntry();
  2599. #endif    
  2600.     }
  2601.   }
  2602.   
  2603.   /**
  2604.    * Reset f_bufferLock
  2605.    */
  2606.   f_bufferLock = 0;
  2607.   b_bufferLock = 0;
  2608. }
  2609. void
  2610. SumaParticipant::execSUB_GCP_COMPLETE_REP(Signal* signal){
  2611.   jamEntry();
  2612.   SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
  2613.   Uint32 gci = rep->gci;
  2614.   c_lastCompleteGCI = gci;
  2615.   /**
  2616.    * Signal to subscriber(s)
  2617.    */
  2618.   SubscriberPtr subbPtr;
  2619.   SubscriptionPtr subPtr;
  2620.   c_dataSubscribers.first(subbPtr);
  2621.   for(; !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
  2622.     if (subbPtr.p->m_firstGCI > gci) {
  2623.       jam();
  2624.       // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
  2625.       continue;
  2626.     }
  2627.     const Uint32 ref = subbPtr.p->m_subscriberRef;
  2628.     rep->senderRef  = ref;
  2629.     rep->senderData = subbPtr.p->m_subscriberData;
  2630.     c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  2631. #if PRINT_ONLY
  2632.     ndbout_c("GSN_SUB_GCP_COMPLETE_REP to %s:",
  2633.      getBlockName(refToBlock(ref)));
  2634. #else
  2635.     CRASH_INSERTION(13018);
  2636.     if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
  2637.       {
  2638. jam();
  2639. sendSignal(ref, GSN_SUB_GCP_COMPLETE_REP, signal,
  2640.    SubGcpCompleteRep::SignalLength, JBB);
  2641.       }
  2642.     else
  2643.       {
  2644. jam();
  2645. ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
  2646. EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_GCP_COMPLETE_REP, signal,
  2647.        SubGcpCompleteRep::SignalLength);  
  2648. jamEntry();
  2649.       }
  2650. #endif    
  2651.   }
  2652.   if (c_handoverToDo) {
  2653.     jam();
  2654.     c_handoverToDo = false;
  2655.     for( int i = 0; i < NO_OF_BUCKETS; i++) {
  2656.       if (c_buckets[i].handover) {
  2657. if (c_buckets[i].handoverGCI > gci) {
  2658.   jam();
  2659.   c_handoverToDo = true; // still waiting for the right GCI
  2660.   break; /* since all handover should happen at the same time
  2661.   * we can break here
  2662.   */
  2663. } else {
  2664.   c_buckets[i].handover = false;
  2665. #ifdef HANDOVER_DEBUG
  2666.   ndbout_c("Handover Bucket %u", i);
  2667. #endif
  2668.   if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
  2669.     // my bucket to be handed over to me
  2670.     ndbrequire(!c_buckets[i].active);
  2671.     jam();
  2672.     c_buckets[i].active = true;
  2673.   } else {
  2674.     // someone else's bucket to handover to
  2675.     ndbrequire(c_buckets[i].active);
  2676.     jam();
  2677.     c_buckets[i].active = false;
  2678.   }
  2679. }
  2680.       }
  2681.     }
  2682.   }
  2683. }
  2684. /***********************************************************
  2685.  *
  2686.  * Embryo to syncronize the Suma's so as to know if a subscriber
  2687.  * has received a GCP_COMPLETE from all suma's or not
  2688.  *
  2689.  */
  2690. void
  2691. SumaParticipant::runSUB_GCP_COMPLETE_ACC(Signal* signal){
  2692.   jam();
  2693.   SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
  2694.   Uint32 gci = acc->rep.gci;
  2695. #ifdef EVENT_DEBUG
  2696.   ndbout_c("SumaParticipant::runSUB_GCP_COMPLETE_ACC gci = %u", gci);
  2697. #endif
  2698.   c_failoverBuffer.subGcpCompleteRep(gci);
  2699. }
  2700. void
  2701. Suma::execSUB_GCP_COMPLETE_ACC(Signal* signal){
  2702.   jamEntry();
  2703.   if (RtoI(signal->getSendersBlockRef(), false) != RNIL) {
  2704.     jam();
  2705.     // Ack from other SUMA
  2706.     runSUB_GCP_COMPLETE_ACC(signal);
  2707.     return;
  2708.   }
  2709.   jam();
  2710.   // Ack from User and not an acc from other SUMA, redistribute in nodegroup
  2711.   SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
  2712.   Uint32 gci = acc->rep.gci;
  2713.   Uint32 senderRef  = acc->rep.senderRef;
  2714.   Uint32 subscriberData = acc->rep.subscriberData;
  2715.   
  2716. #ifdef EVENT_DEBUG
  2717.   ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = %u", gci);
  2718. #endif
  2719.   bool moreToCome = false;
  2720.   SubscriberPtr subbPtr;
  2721.   for(c_dataSubscribers.first(subbPtr);
  2722.       !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
  2723. #ifdef EVENT_DEBUG
  2724.     ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC %u == %u && %u == %u",
  2725.      subbPtr.p->m_subscriberRef,
  2726.      senderRef,
  2727.      subbPtr.p->m_subscriberData,
  2728.      subscriberData);
  2729. #endif
  2730.     if (subbPtr.p->m_subscriberRef == senderRef &&
  2731. subbPtr.p->m_subscriberData == subscriberData) {
  2732.       jam();
  2733. #ifdef EVENT_DEBUG
  2734.       ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = FOUND SUBSCRIBER");
  2735. #endif
  2736.       subbPtr.p->m_lastGCI = gci;
  2737.     } else if (subbPtr.p->m_lastGCI < gci) {
  2738.       jam();
  2739.       if (subbPtr.p->m_firstGCI <= gci)
  2740. moreToCome = true;
  2741.     } else
  2742.       jam();
  2743.   }
  2744.   
  2745.   if (!moreToCome) {
  2746.     // tell the other SUMA's that I'm done with this GCI
  2747.     jam();
  2748.     for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  2749.       Uint32 id = c_nodesInGroup[i];
  2750.       Uint32 ref = calcSumaBlockRef(id);
  2751.       if ((ref != reference()) && c_aliveNodes.get(id)) {
  2752. jam();
  2753. sendSignal(ref, GSN_SUB_GCP_COMPLETE_ACC, signal,
  2754.    SubGcpCompleteAcc::SignalLength, JBB);
  2755.       } else
  2756. jam();
  2757.     }
  2758.   }
  2759. }
  2760. static Uint32 tmpFailoverBuffer[512];
  2761. //SumaParticipant::FailoverBuffer::FailoverBuffer(DataBuffer<15>::DataBufferPool & p)
  2762. //  :  m_dataList(p), 
  2763. SumaParticipant::FailoverBuffer::FailoverBuffer()
  2764.   :
  2765.      c_gcis(tmpFailoverBuffer), c_sz(512), c_first(0), c_next(0), c_full(false)
  2766. {
  2767. }
  2768. bool SumaParticipant::FailoverBuffer::subTableData(Uint32 gci, Uint32 *src, int sz)
  2769. {
  2770.   bool ok = true;
  2771.   if (c_full) {
  2772.     ok = false;
  2773. #ifdef EVENT_DEBUG
  2774.     ndbout_c("Suma::FailoverBuffer::SubTableData buffer full gci=%u");
  2775. #endif
  2776.   } else {
  2777.     c_gcis[c_next] = gci;
  2778.     c_next++;
  2779.     if (c_next == c_sz) c_next = 0;
  2780.     if (c_next == c_first)
  2781.       c_full = true;
  2782.     //    ndbout_c("%u %u %u",c_first,c_next,c_sz);
  2783.   }
  2784.   return ok;
  2785. }
  2786. bool SumaParticipant::FailoverBuffer::subGcpCompleteRep(Uint32 gci)
  2787. {
  2788.   bool ok = true;
  2789.   //  ndbout_c("Empty");
  2790.   while (true) {
  2791.     if (c_first == c_next && !c_full)
  2792.       break;
  2793.     if (c_gcis[c_first] > gci)
  2794.       break;
  2795.     c_full = false;
  2796.     c_first++;
  2797.     if (c_first == c_sz) c_first = 0;
  2798.     //    ndbout_c("%u %u %u : ",c_first,c_next,c_sz);
  2799.   }
  2800.   return ok;
  2801. }
  2802. bool SumaParticipant::FailoverBuffer::nodeFailRep()
  2803. {
  2804.   bool ok = true;
  2805.   while (true) {
  2806.     if (c_first == c_next && !c_full)
  2807.       break;
  2808. #ifdef EVENT_DEBUG
  2809.     ndbout_c("Suma::FailoverBuffer::NodeFailRep resending gci=%u", c_gcis[c_first]);
  2810. #endif
  2811.     c_full = false;
  2812.     c_first++;
  2813.     if (c_first == c_sz) c_first = 0;
  2814.   }
  2815.   return ok;
  2816. }
  2817. /**********************************************************
  2818.  * Suma participant interface
  2819.  *
  2820.  * Stopping and removing of subscriber
  2821.  *
  2822.  */
  2823. void
  2824. SumaParticipant::execSUB_STOP_REQ(Signal* signal){
  2825.   jamEntry();
  2826.   
  2827.   CRASH_INSERTION(13019);
  2828.   SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
  2829.   Uint32 senderRef      = signal->getSendersBlockRef();
  2830.   Uint32 senderData     = req->senderData;
  2831.   Uint32 subscriberRef  = req->subscriberRef;
  2832.   Uint32 subscriberData = req->subscriberData;
  2833.   SubscriptionPtr subPtr;
  2834.   Subscription key; 
  2835.   key.m_subscriptionId  = req->subscriptionId;
  2836.   key.m_subscriptionKey = req->subscriptionKey;
  2837.   Uint32 part = req->part;
  2838.   
  2839.   if (key.m_subscriptionKey == 0 &&
  2840.       key.m_subscriptionId == 0 &&
  2841.       subscriberData == 0) {
  2842.     SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
  2843.     
  2844.     conf->senderRef       = reference();
  2845.     conf->senderData      = senderData;
  2846.     conf->subscriptionId  = key.m_subscriptionId;
  2847.     conf->subscriptionKey = key.m_subscriptionKey;
  2848.     conf->subscriberData  = subscriberData;
  2849.     sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
  2850.        SubStopConf::SignalLength, JBB);
  2851.     removeSubscribersOnNode(signal, refToNode(subscriberRef));
  2852.     return;
  2853.   }
  2854.   if(!c_subscriptions.find(subPtr, key)){
  2855.     jam();
  2856.     sendSubStopRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);
  2857.     return;
  2858.   }
  2859.   
  2860.   ndbrequire(part == SubscriptionData::TableData);
  2861.   SubscriberPtr subbPtr;
  2862.   if (senderRef == reference()){
  2863.     jam();
  2864.     c_subscriberPool.getPtr(subbPtr, senderData);
  2865.     ndbrequire(subbPtr.p->m_subPtrI == subPtr.i && 
  2866.        subbPtr.p->m_subscriberRef == subscriberRef &&
  2867.        subbPtr.p->m_subscriberData == subscriberData);
  2868.     c_removeDataSubscribers.remove(subbPtr);
  2869.   } else {
  2870.     bool found = false;
  2871.     jam();
  2872.     c_dataSubscribers.first(subbPtr);
  2873.     for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
  2874.       jam();
  2875.       if (subbPtr.p->m_subPtrI == subPtr.i && 
  2876.   subbPtr.p->m_subscriberRef == subscriberRef &&
  2877.   subbPtr.p->m_subscriberData == subscriberData){
  2878. // ndbout_c("STOP_REQ: before c_dataSubscribers.release");
  2879. jam();
  2880. c_dataSubscribers.remove(subbPtr);
  2881. found = true;
  2882. break;
  2883.       }
  2884.     }
  2885.     /**
  2886.      * If we didn't find anyone, send ref
  2887.      */
  2888.     if (!found) {
  2889.       jam();
  2890.       sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND);
  2891.       return;
  2892.     }
  2893.   }
  2894.   subbPtr.p->m_senderRef  = senderRef; // store ref to requestor
  2895.   subbPtr.p->m_senderData = senderData; // store ref to requestor
  2896.   c_prepDataSubscribers.add(subbPtr);
  2897.   Ptr<SyncRecord> syncPtr;
  2898.   c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  2899.   if (syncPtr.p->m_locked) {
  2900.     jam();
  2901.     sendSubStopRef(signal, /** Error Code */ 0, true);
  2902.     return;
  2903.   }
  2904.   syncPtr.p->m_locked = true;
  2905.   syncPtr.p->startDropTrigger(signal);
  2906. }
  2907. void
  2908. SumaParticipant::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr){
  2909.   jam();
  2910.   CRASH_INSERTION(13020);
  2911.   SubscriptionPtr subPtr;
  2912.   c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  2913.   Ptr<SyncRecord> syncPtr;
  2914.   c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  2915.   syncPtr.p->m_locked = false;
  2916.   SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
  2917.   
  2918.   conf->senderRef = reference();
  2919.   conf->senderData = subbPtr.p->m_senderData;
  2920.   conf->subscriptionId  = subPtr.p->m_subscriptionId;
  2921.   conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  2922.   conf->subscriberData  = subbPtr.p->m_subscriberData;
  2923.   Uint32 senderRef = subbPtr.p->m_senderRef;
  2924.   c_prepDataSubscribers.release(subbPtr);
  2925.   sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
  2926.      SubStopConf::SignalLength, JBB);
  2927. }
  2928. void
  2929. SumaParticipant::sendSubStopRef(Signal* signal, Uint32 errCode,
  2930. bool temporary){
  2931.   jam();
  2932.   SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
  2933.   ref->senderRef = reference();
  2934.   ref->errorCode = errCode;
  2935.   if (temporary) {
  2936.     ref->setTemporary();
  2937.   }
  2938.   sendSignal(signal->getSendersBlockRef(), 
  2939.      GSN_SUB_STOP_REF, 
  2940.      signal, 
  2941.      SubStopRef::SignalLength,
  2942.      JBB);
  2943.   return;
  2944. }
  2945. /**************************************************************
  2946.  *
  2947.  * Removing subscription
  2948.  *
  2949.  */
  2950. void
  2951. SumaParticipant::execSUB_REMOVE_REQ(Signal* signal) {
  2952.   jamEntry();
  2953.   Uint32 senderRef = signal->getSendersBlockRef();
  2954.   CRASH_INSERTION(13021);
  2955.   const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
  2956.   SubscriptionPtr subPtr;
  2957.   Subscription key;
  2958.   key.m_subscriptionId  = req.subscriptionId;
  2959.   key.m_subscriptionKey = req.subscriptionKey;
  2960.   
  2961.   if(!c_subscriptions.find(subPtr, key)) {
  2962.     jam();
  2963.     sendSubRemoveRef(signal, req, (Uint32) GrepError::SUBSCRIPTION_ID_NOT_FOUND);
  2964.     return;
  2965.   }
  2966.   
  2967.   int count = 0;
  2968.   {
  2969.     jam();
  2970.     SubscriberPtr i_subbPtr;
  2971.     for(c_prepDataSubscribers.first(i_subbPtr);
  2972. !i_subbPtr.isNull(); c_prepDataSubscribers.next(i_subbPtr)){
  2973.       jam();
  2974.       if( i_subbPtr.p->m_subPtrI == subPtr.i ) {
  2975. jam();
  2976. sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true);
  2977. return;
  2978. // c_prepDataSubscribers.release(subbPtr);
  2979.       }
  2980.     }
  2981.     c_dataSubscribers.first(i_subbPtr);
  2982.     while(!i_subbPtr.isNull()){
  2983.       jam();
  2984.       SubscriberPtr subbPtr = i_subbPtr;
  2985.       c_dataSubscribers.next(i_subbPtr);
  2986.       if( subbPtr.p->m_subPtrI == subPtr.i ) {
  2987. jam();
  2988. sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true);
  2989. return;
  2990. /* Unfinished/untested code.  If remove should be possible
  2991.  * even if subscribers are left these have to be stopped 
  2992.  * first. See m_markRemove, m_nSubscribers. We need also to
  2993.  * block remove for this subscription so that multiple
  2994.  * removes is not possible...
  2995.  */
  2996. c_dataSubscribers.remove(subbPtr);
  2997. c_removeDataSubscribers.add(subbPtr);
  2998. count++;
  2999.       }
  3000.     }
  3001.     c_metaSubscribers.first(i_subbPtr);
  3002.     while(!i_subbPtr.isNull()){
  3003.       jam();
  3004.       SubscriberPtr subbPtr = i_subbPtr;
  3005.       c_metaSubscribers.next(i_subbPtr);
  3006.       if( subbPtr.p->m_subPtrI == subPtr.i ){
  3007. jam();
  3008. c_metaSubscribers.release(subbPtr);
  3009.       }
  3010.     }
  3011.   }
  3012.   subPtr.p->m_senderRef  = senderRef;
  3013.   subPtr.p->m_senderData = req.senderData;
  3014.   if (count > 0){
  3015.     jam();
  3016.     ndbrequire(false); // code not finalized
  3017.     subPtr.p->m_markRemove = true;
  3018.     subPtr.p->m_nSubscribers = count;
  3019.     sendSubStopReq(signal);
  3020.   } else {
  3021.     completeSubRemoveReq(signal, subPtr);
  3022.   }
  3023. }
  3024. void
  3025. SumaParticipant::completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr) {
  3026.   Uint32 subscriptionId  = subPtr.p->m_subscriptionId;
  3027.   Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
  3028.   Uint32 senderRef       = subPtr.p->m_senderRef;
  3029.   Uint32 senderData      = subPtr.p->m_senderData;
  3030.   {
  3031.     Ptr<SyncRecord> syncPtr;
  3032.     c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  3033.     syncPtr.p->release();
  3034.     c_syncPool.release(syncPtr);
  3035.   }
  3036.   //  if (subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
  3037.   //    jam();
  3038.   //    senderRef = subPtr.p->m_subscriberRef;
  3039.   //  }
  3040.   c_subscriptions.release(subPtr);
  3041.   /**
  3042.    * I was the last subscription to be remove so clear c_tables
  3043.    */
  3044. #if 0
  3045.   ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
  3046.    c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());
  3047. #endif
  3048.   if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) {
  3049.     jam();
  3050. #if 0
  3051.     ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
  3052. #endif
  3053.     KeyTable<Table>::Iterator it;
  3054.     for(c_tables.first(it); !it.isNull(); ){
  3055.       
  3056.       it.curr.p->release(* this);
  3057.       
  3058.       TablePtr tabPtr = it.curr;
  3059.       
  3060.       c_tables.next(it);
  3061.       c_tables.release(tabPtr);
  3062.     }
  3063.   }
  3064.   
  3065.   SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
  3066.   conf->senderRef            = reference();
  3067.   conf->senderData           = senderData;
  3068.   conf->subscriptionId       = subscriptionId;
  3069.   conf->subscriptionKey      = subscriptionKey;
  3070.   sendSignal(senderRef, GSN_SUB_REMOVE_CONF, signal,
  3071.      SubRemoveConf::SignalLength, JBB);
  3072. }
  3073. void
  3074. SumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
  3075.   Uint32 errCode, bool temporary){
  3076.   jam();
  3077.   SubRemoveRef  * ref = (SubRemoveRef *)signal->getDataPtrSend();
  3078.   ref->senderRef  = reference();
  3079.   ref->senderData = req.senderData;
  3080.   ref->err = errCode;
  3081.   if (temporary)
  3082.     ref->setTemporary();
  3083.   releaseSections(signal);
  3084.   sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF, 
  3085.      signal, SubRemoveRef::SignalLength, JBB);
  3086.   return;
  3087. }
  3088. void
  3089. SumaParticipant::Table::release(SumaParticipant & suma){
  3090.   jam();
  3091.   LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
  3092.   attrBuf.release();
  3093.   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
  3094.   fragBuf.release();
  3095. }
  3096. void
  3097. SumaParticipant::SyncRecord::release(){
  3098.   jam();
  3099.   m_tableList.release();
  3100.   LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
  3101.   attrBuf.release();  
  3102. }
  3103. /**************************************************************
  3104.  *
  3105.  * Restarting remote node functions, master functionality
  3106.  * (slave does nothing special)
  3107.  * - triggered on INCL_NODEREQ calling startNode
  3108.  * - included node will issue START_ME when it's ready to start
  3109.  * the subscribers
  3110.  *
  3111.  */
  3112. Suma::Restart::Restart(Suma& s) : suma(s) {
  3113.   for (int i = 0; i < MAX_REPLICAS; i++) {
  3114.     c_okToStart[i]      = false;
  3115.     c_waitingToStart[i] = false;
  3116.   }
  3117. }
  3118. void
  3119. Suma::Restart::resetNode(Uint32 sumaRef)
  3120. {
  3121.   jam();
  3122.   int I = suma.RtoI(sumaRef);
  3123.   c_okToStart[I] = false;
  3124.   c_waitingToStart[I] = false;
  3125. }
  3126. void
  3127. Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
  3128. {
  3129.   jam();
  3130.   resetNode(sumaRef);
  3131.   // right now we can only handle restarting one node
  3132.   // at a time in a node group
  3133.   createSubscription(signal, sumaRef);
  3134. }
  3135. void 
  3136. Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef) {
  3137.   jam();
  3138.   suma.c_subscriptions.first(c_subPtr);
  3139.   nextSubscription(signal, sumaRef);
  3140. }
  3141. void 
  3142. Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) {
  3143.   jam();
  3144.   if (c_subPtr.isNull()) {
  3145.     jam();
  3146.     completeSubscription(signal, sumaRef);
  3147.     return;
  3148.   }
  3149.   SubscriptionPtr subPtr;
  3150.   subPtr.i = c_subPtr.curr.i;
  3151.   subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);
  3152.   suma.c_subscriptions.next(c_subPtr);
  3153.   SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
  3154.       
  3155.   req->subscriberRef    = suma.reference();
  3156.   req->subscriberData   = subPtr.i;
  3157.   req->subscriptionId   = subPtr.p->m_subscriptionId;
  3158.   req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  3159.   req->subscriptionType = subPtr.p->m_subscriptionType |
  3160.     SubCreateReq::RestartFlag;
  3161.   switch (subPtr.p->m_subscriptionType) {
  3162.   case SubCreateReq::TableEvent:
  3163.   case SubCreateReq::SelectiveTableSnapshot:
  3164.   case SubCreateReq::DatabaseSnapshot: {
  3165.     jam();
  3166.       
  3167.     Ptr<SyncRecord> syncPtr;
  3168.     suma.c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  3169.     syncPtr.p->m_tableList.first(syncPtr.p->m_tableList_it);
  3170.     ndbrequire(!syncPtr.p->m_tableList_it.isNull());
  3171.     req->tableId = *syncPtr.p->m_tableList_it.data;
  3172.       
  3173. #if 0
  3174.     for (int i = 0; i < MAX_TABLES; i++)
  3175.       if (subPtr.p->m_tables[i]) {
  3176. req->tableId = i;
  3177. break;
  3178.       }
  3179. #endif
  3180.     suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
  3181.     SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
  3182.     return;
  3183.   }
  3184.   case SubCreateReq::SingleTableScan :
  3185.     // TODO
  3186.     jam();
  3187.     return;
  3188.   }
  3189.   ndbrequire(false);
  3190. }
  3191. void 
  3192. Suma::execSUB_CREATE_CONF(Signal* signal) {
  3193.   jamEntry();
  3194. #ifdef NODEFAIL_DEBUG
  3195.   ndbout_c("Suma::execSUB_CREATE_CONF");
  3196. #endif
  3197.   const Uint32 senderRef = signal->senderBlockRef();
  3198.   SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
  3199.   Subscription key;
  3200.   const Uint32 subscriberData = conf->subscriberData;
  3201.   key.m_subscriptionId        = conf->subscriptionId;
  3202.   key.m_subscriptionKey       = conf->subscriptionKey;
  3203.   
  3204.   SubscriptionPtr subPtr;
  3205.   ndbrequire(c_subscriptions.find(subPtr, key));
  3206.   switch(subPtr.p->m_subscriptionType) {
  3207.   case SubCreateReq::TableEvent:
  3208.   case SubCreateReq::SelectiveTableSnapshot:
  3209.   case SubCreateReq::DatabaseSnapshot:
  3210.     {
  3211.       Ptr<SyncRecord> syncPtr;
  3212.       c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  3213.       syncPtr.p->m_tableList.next(syncPtr.p->m_tableList_it);
  3214.       if (syncPtr.p->m_tableList_it.isNull()) {
  3215. jam();
  3216. SubSyncReq *req = (SubSyncReq *)signal->getDataPtrSend();
  3217.     
  3218. req->subscriptionId  = key.m_subscriptionId;
  3219. req->subscriptionKey = key.m_subscriptionKey;
  3220. req->subscriberData  = subscriberData;
  3221. req->part            = (Uint32) SubscriptionData::MetaData;
  3222. sendSignal(senderRef, GSN_SUB_SYNC_REQ, signal,
  3223.    SubSyncReq::SignalLength, JBB);
  3224.       } else {
  3225. jam();
  3226. SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
  3227.       
  3228. req->subscriberRef    = reference();
  3229. req->subscriberData   = subPtr.i;
  3230. req->subscriptionId   = subPtr.p->m_subscriptionId;
  3231. req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  3232. req->subscriptionType = subPtr.p->m_subscriptionType |
  3233.   SubCreateReq::RestartFlag |
  3234.   SubCreateReq::AddTableFlag;
  3235. req->tableId = *syncPtr.p->m_tableList_it.data;
  3236. sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
  3237.    SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
  3238.       }
  3239.     }
  3240.     return;
  3241.   case SubCreateReq::SingleTableScan:
  3242.     ndbrequire(false);
  3243.   }
  3244.   ndbrequire(false);
  3245. }
  3246. void 
  3247. Suma::execSUB_CREATE_REF(Signal* signal) {
  3248.   jamEntry();
  3249. #ifdef NODEFAIL_DEBUG
  3250.   ndbout_c("Suma::execSUB_CREATE_REF");
  3251. #endif
  3252.   //ndbrequire(false);
  3253. }
  3254. void 
  3255. Suma::execSUB_SYNC_CONF(Signal* signal) {
  3256.   jamEntry();
  3257. #ifdef NODEFAIL_DEBUG
  3258.   ndbout_c("Suma::execSUB_SYNC_CONF");
  3259. #endif
  3260.   Uint32 sumaRef = signal->getSendersBlockRef();
  3261.   SubSyncConf *conf = (SubSyncConf *)signal->getDataPtr();
  3262.   Subscription key;
  3263.   key.m_subscriptionId            = conf->subscriptionId;
  3264.   key.m_subscriptionKey           = conf->subscriptionKey;
  3265.   //  SubscriptionData::Part part     = (SubscriptionData::Part)conf->part;
  3266.   //  const Uint32 subscriberData     = conf->subscriberData;
  3267.   SubscriptionPtr subPtr;
  3268.   c_subscriptions.find(subPtr, key);
  3269.   switch(subPtr.p->m_subscriptionType) {
  3270.   case SubCreateReq::TableEvent:
  3271.   case SubCreateReq::SelectiveTableSnapshot:
  3272.   case SubCreateReq::DatabaseSnapshot:
  3273.     jam();
  3274.     Restart.nextSubscription(signal, sumaRef);
  3275.     return;
  3276.   case SubCreateReq::SingleTableScan:
  3277.     ndbrequire(false);
  3278.     return;
  3279.   }
  3280.   ndbrequire(false);
  3281. }
  3282. void 
  3283. Suma::execSUB_SYNC_REF(Signal* signal) {
  3284.   jamEntry();
  3285. #ifdef NODEFAIL_DEBUG
  3286.   ndbout_c("Suma::execSUB_SYNC_REF");
  3287. #endif
  3288.   //ndbrequire(false);
  3289. }
  3290. void
  3291. Suma::execSUMA_START_ME(Signal* signal) {
  3292.   jamEntry();
  3293. #ifdef NODEFAIL_DEBUG
  3294.   ndbout_c("Suma::execSUMA_START_ME");
  3295. #endif
  3296.   Restart.runSUMA_START_ME(signal, signal->getSendersBlockRef());
  3297. }
  3298. void
  3299. Suma::Restart::runSUMA_START_ME(Signal* signal, Uint32 sumaRef) {
  3300.   int I = suma.RtoI(sumaRef);
  3301.   // restarting Suma is ready for SUB_START_REQ
  3302.   if (c_waitingToStart[I]) {
  3303.     // we've waited with startSubscriber since restarting suma was not ready
  3304.     c_waitingToStart[I] = false;
  3305.     startSubscriber(signal, sumaRef);
  3306.   } else {
  3307.     // do startSubscriber as soon as its time
  3308.     c_okToStart[I] = true;
  3309.   }
  3310. }
  3311. void 
  3312. Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef) {
  3313.   jam();
  3314.   int I = suma.RtoI(sumaRef);
  3315.   if (c_okToStart[I]) {// otherwise will start when START_ME comes
  3316.     c_okToStart[I] = false;
  3317.     startSubscriber(signal, sumaRef);
  3318.   } else {
  3319.     c_waitingToStart[I] = true;
  3320.   }
  3321. }
  3322. void 
  3323. Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef) {
  3324.   jam();
  3325.   suma.c_dataSubscribers.first(c_subbPtr);
  3326.   nextSubscriber(signal, sumaRef);
  3327. }
  3328. void
  3329. Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
  3330.        Signal* signal, Uint32 sumaRef)
  3331. {
  3332.   jam();
  3333.   SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
  3334.       
  3335.   req->senderRef        = suma.reference();
  3336.   req->senderData       = subbPtr.p->m_senderData;
  3337.   req->subscriptionId   = subPtr.p->m_subscriptionId;
  3338.   req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  3339.   req->part             = SubscriptionData::TableData;
  3340.   req->subscriberData   = subbPtr.p->m_subscriberData;
  3341.   req->subscriberRef    = subbPtr.p->m_subscriberRef;
  3342.       
  3343.   // restarting suma will not respond to this until startphase 5
  3344.   // since it is not until then data copying has been completed
  3345. #ifdef NODEFAIL_DEBUG
  3346.   ndbout_c("Suma::Restart::sendSubStartReq sending GSN_SUB_START_REQ id=%u key=%u",
  3347.    req->subscriptionId, req->subscriptionKey);
  3348. #endif
  3349.   suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
  3350.   signal, SubStartReq::SignalLength2, JBB);
  3351. }
  3352. void 
  3353. Suma::execSUB_START_CONF(Signal* signal) {
  3354.   jamEntry();
  3355. #ifdef NODEFAIL_DEBUG
  3356.   ndbout_c("Suma::execSUB_START_CONF");
  3357. #endif
  3358.   Uint32 sumaRef = signal->getSendersBlockRef();
  3359.   Restart.nextSubscriber(signal, sumaRef);
  3360. }
  3361. void 
  3362. Suma::execSUB_START_REF(Signal* signal) {
  3363.   jamEntry();
  3364. #ifdef NODEFAIL_DEBUG
  3365.   ndbout_c("Suma::execSUB_START_REF");
  3366. #endif
  3367.   //ndbrequire(false);
  3368. }
  3369. void 
  3370. Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef) {
  3371.   jam();
  3372.   if (c_subbPtr.isNull()) {
  3373.     jam();
  3374.     completeSubscriber(signal, sumaRef);
  3375.     return;
  3376.   }
  3377.   
  3378.   SubscriberPtr subbPtr = c_subbPtr;
  3379.   suma.c_dataSubscribers.next(c_subbPtr);
  3380.   /*
  3381.    * get subscription ptr for this subscriber
  3382.    */
  3383.   SubscriptionPtr subPtr;
  3384.   suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  3385.   switch (subPtr.p->m_subscriptionType) {
  3386.   case SubCreateReq::TableEvent:
  3387.   case SubCreateReq::SelectiveTableSnapshot:
  3388.   case SubCreateReq::DatabaseSnapshot:
  3389.     {
  3390.       jam();
  3391.       sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
  3392. #if 0
  3393.       SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
  3394.       
  3395.       req->senderRef        = reference();
  3396.       req->senderData       = subbPtr.p->m_senderData;
  3397.       req->subscriptionId   = subPtr.p->m_subscriptionId;
  3398.       req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  3399.       req->part             = SubscriptionData::TableData;
  3400.       req->subscriberData   = subbPtr.p->m_subscriberData;
  3401.       req->subscriberRef    = subbPtr.p->m_subscriberRef;
  3402.       
  3403.       // restarting suma will not respond to this until startphase 5
  3404.       // since it is not until then data copying has been completed
  3405. #ifdef NODEFAIL_DEBUG
  3406.       ndbout_c("Suma::nextSubscriber sending GSN_SUB_START_REQ id=%u key=%u",
  3407.        req->subscriptionId, req->subscriptionKey);
  3408. #endif
  3409.       suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
  3410.       signal, SubStartReq::SignalLength2, JBB);
  3411. #endif
  3412.     }
  3413.   return;
  3414.   case SubCreateReq::SingleTableScan:
  3415.     ndbrequire(false);
  3416.     return;
  3417.   }
  3418.   ndbrequire(false);
  3419. }
  3420. void 
  3421. Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef) {
  3422.   completeRestartingNode(signal, sumaRef);
  3423. }
  3424. void
  3425. Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef) {
  3426.   jam();
  3427.   SumaHandoverReq * req = (SumaHandoverReq *)signal->getDataPtrSend();
  3428.   req->gci = suma.getFirstGCI(signal);
  3429.   suma.sendSignal(sumaRef, GSN_SUMA_HANDOVER_REQ, signal,
  3430.   SumaHandoverReq::SignalLength, JBB);
  3431. }
  3432. // only run on restarting suma
  3433. void
  3434. Suma::execSUMA_HANDOVER_REQ(Signal* signal)
  3435. {
  3436.   jamEntry();
  3437.   //  Uint32 sumaRef = signal->getSendersBlockRef();
  3438.   SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();
  3439.   Uint32 gci = req->gci;
  3440.   Uint32 new_gci = getFirstGCI(signal);
  3441.   if (new_gci > gci) {
  3442.     gci = new_gci;
  3443.   }
  3444.   { // all recreated subscribers at restarting SUMA start at same GCI
  3445.     SubscriberPtr subbPtr;
  3446.     for(c_dataSubscribers.first(subbPtr);
  3447. !subbPtr.isNull();
  3448. c_dataSubscribers.next(subbPtr)){
  3449.       subbPtr.p->m_firstGCI = gci;
  3450.     }
  3451.   }
  3452. #ifdef NODEFAIL_DEBUG
  3453.   ndbout_c("Suma::execSUMA_HANDOVER_REQ, gci = %u", gci);
  3454. #endif
  3455.   c_handoverToDo = false;
  3456.   c_restartLock = false;
  3457.   {
  3458. #ifdef HANDOVER_DEBUG
  3459.     int c = 0;
  3460. #endif
  3461.     for( int i = 0; i < NO_OF_BUCKETS; i++) {
  3462.       jam();
  3463.       if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
  3464. #ifdef HANDOVER_DEBUG
  3465. c++;
  3466. #endif
  3467.         jam();
  3468. c_buckets[i].active = false;
  3469. c_buckets[i].handoverGCI = gci;
  3470. c_buckets[i].handover = true;
  3471. c_buckets[i].handover_started = false;
  3472. c_handoverToDo = true;
  3473.       }
  3474.     }
  3475. #ifdef HANDOVER_DEBUG
  3476.     ndbout_c("prepared handover of bucket %u buckets", c);
  3477. #endif
  3478.   }
  3479.   for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  3480.     jam();
  3481.     Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
  3482.     if (ref != reference()) {
  3483.       jam();
  3484.       sendSignal(ref, GSN_SUMA_HANDOVER_CONF, signal,
  3485.  SumaHandoverConf::SignalLength, JBB);
  3486.     }//if
  3487.   }
  3488. }
  3489. // only run on all but restarting suma
  3490. void
  3491. Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
  3492.   jamEntry();
  3493.   Uint32 sumaRef = signal->getSendersBlockRef();
  3494.   SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();
  3495.   Uint32 gci = conf->gci;
  3496. #ifdef HANDOVER_DEBUG
  3497.   ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
  3498. #endif
  3499.   /* TODO, if we are restarting several SUMA's (>2 in a nodegroup)
  3500.    * we have to collect all these conf's before proceding
  3501.    */
  3502.   // restarting node is now prepared and ready
  3503.   c_preparingNodes.clear(refToNode(sumaRef)); /* !! important to do before
  3504.        * below since it affects
  3505.        * getResponsibleSumaNodeId()
  3506.        */
  3507.   c_handoverToDo = false;
  3508.   // mark all active buckets really belonging to restarting SUMA
  3509.   for( int i = 0; i < NO_OF_BUCKETS; i++) {
  3510.     if (c_buckets[i].active) {
  3511.       // I'm running this bucket
  3512.       if (getResponsibleSumaNodeId(i) == refToNode(sumaRef)) {
  3513. // but it should really be the restarted node
  3514. c_buckets[i].handoverGCI = gci;
  3515. c_buckets[i].handover = true;
  3516. c_buckets[i].handover_started = false;
  3517. c_handoverToDo = true;
  3518.       }
  3519.     }
  3520.   }
  3521. }
  3522. template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);