Trix.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:30k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "Trix.hpp"
  14. #include <string.h>
  15. #include <kernel_types.h>
  16. #include <NdbOut.hpp>
  17. #include <signaldata/ReadNodesConf.hpp>
  18. #include <signaldata/NodeFailRep.hpp>
  19. #include <signaldata/DumpStateOrd.hpp>
  20. #include <signaldata/GetTabInfo.hpp>
  21. #include <signaldata/DictTabInfo.hpp>
  22. #include <signaldata/BuildIndx.hpp>
  23. #include <signaldata/SumaImpl.hpp>
  24. #include <signaldata/UtilPrepare.hpp>
  25. #include <signaldata/UtilExecute.hpp>
  26. #include <signaldata/UtilRelease.hpp>
  27. #include <SectionReader.hpp>
  28. #include <AttributeHeader.hpp>
  29. #define CONSTRAINT_VIOLATION 893
  30. #define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
  31. /**
  32.  *
  33.  */
  34. Trix::Trix(const Configuration & conf) :
  35.   SimulatedBlock(TRIX, conf),
  36.   c_theNodes(c_theNodeRecPool),
  37.   c_masterNodeId(0),
  38.   c_masterTrixRef(0),
  39.   c_noNodesFailed(0),
  40.   c_noActiveNodes(0),
  41.   c_theSubscriptions(c_theSubscriptionRecPool)
  42. {
  43.   BLOCK_CONSTRUCTOR(Trix);
  44.   // Add received signals
  45.   addRecSignal(GSN_STTOR,  &Trix::execSTTOR);
  46.   addRecSignal(GSN_NDB_STTOR,  &Trix::execNDB_STTOR); // Forwarded from DICT
  47.   addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
  48.   addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
  49.   addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
  50.   addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
  51.   addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
  52.   // Index build
  53.   addRecSignal(GSN_BUILDINDXREQ, &Trix::execBUILDINDXREQ);
  54.   // Dump testing
  55.   addRecSignal(GSN_BUILDINDXCONF, &Trix::execBUILDINDXCONF);
  56.   addRecSignal(GSN_BUILDINDXREF, &Trix::execBUILDINDXREF);
  57.   addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
  58.   addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
  59.   addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
  60.   addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
  61.   addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
  62.   addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
  63.   // Suma signals
  64.   addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
  65.   addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
  66.   addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
  67.   addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
  68.   addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
  69.   addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
  70.   addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
  71.   addRecSignal(GSN_SUB_META_DATA, &Trix::execSUB_META_DATA);
  72.   addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
  73.   // Allocate pool sizes
  74.   c_theAttrOrderBufferPool.setSize(100);
  75.   c_theSubscriptionRecPool.setSize(100);
  76.   ArrayList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
  77.   SubscriptionRecPtr subptr;
  78.   while(subscriptions.seize(subptr) == true) {
  79.     new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
  80.   }
  81.   subscriptions.release();
  82. }
  83. /**
  84.  *
  85.  */
  86. Trix::~Trix()
  87. {
  88. }
  89. /**
  90.  *
  91.  */
  92. void Trix::execSTTOR(Signal* signal) 
  93. {
  94.   jamEntry();                            
  95.   //const Uint32 startphase   = signal->theData[1];
  96.   const Uint32 theSignalKey = signal->theData[6];
  97.   
  98.   signal->theData[0] = theSignalKey;
  99.   signal->theData[3] = 1;
  100.   signal->theData[4] = 255; // No more start phases from missra
  101.   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
  102.   return;
  103. }//Trix::execSTTOR()
  104. /**
  105.  *
  106.  */
  107. void Trix::execNDB_STTOR(Signal* signal) 
  108. {
  109.   jamEntry();                            
  110.   BlockReference ndbcntrRef = signal->theData[0];  
  111.   Uint16 startphase = signal->theData[2];      /* RESTART PHASE           */
  112.   Uint16 mynode = signal->theData[1];  
  113.   //Uint16 restarttype = signal->theData[3];  
  114.   //UintR configInfo1 = signal->theData[6];     /* CONFIGRATION INFO PART 1 */
  115.   //UintR configInfo2 = signal->theData[7];     /* CONFIGRATION INFO PART 2 */
  116.   switch (startphase) {
  117.   case 3:
  118.     jam();
  119.     /* SYMBOLIC START PHASE 4             */
  120.     /* ABSOLUTE PHASE 5                   */
  121.     /* REQUEST NODE IDENTITIES FROM DBDIH */
  122.     signal->theData[0] = calcTrixBlockRef(mynode);
  123.     sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
  124.     return;
  125.     break;
  126.   case 6:
  127.     break;
  128.   default:
  129.     break;
  130.   }
  131. }
  132. /**
  133.  *
  134.  */
  135. void Trix::execREAD_NODESCONF(Signal* signal)
  136. {
  137.   jamEntry();
  138.   ReadNodesConf * const  readNodes = (ReadNodesConf *)signal->getDataPtr();
  139.   //Uint32 noOfNodes   = readNodes->noOfNodes;
  140.   NodeRecPtr nodeRecPtr;
  141.   c_masterNodeId = readNodes->masterNodeId;
  142.   c_masterTrixRef = RNIL;
  143.   c_noNodesFailed = 0;
  144.   for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
  145.     jam();
  146.     if(NodeBitmask::get(readNodes->allNodes, i)) {
  147.       // Node is defined
  148.       jam();
  149.       ndbrequire(c_theNodes.seizeId(nodeRecPtr, i));
  150.       nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
  151.       if (i == c_masterNodeId) {
  152.         c_masterTrixRef = nodeRecPtr.p->trixRef;
  153.       }
  154.       if(NodeBitmask::get(readNodes->inactiveNodes, i)){
  155.         // Node is not active
  156. jam();
  157. /**-----------------------------------------------------------------
  158.  * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
  159.  * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
  160.  * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE 
  161.  * NOT ALL NODES ARE ALIVE.
  162.  *------------------------------------------------------------------*/
  163. arrGuard(c_noNodesFailed, MAX_NDB_NODES);
  164. nodeRecPtr.p->alive = false;
  165. c_noNodesFailed++;
  166. c_blockState = Trix::NODE_FAILURE;
  167.       }
  168.       else {
  169.         // Node is active
  170.         jam();
  171.         c_noActiveNodes++;
  172.         nodeRecPtr.p->alive = true;
  173.       }
  174.     }
  175.   }
  176.   if (c_noNodesFailed == 0) {
  177.     c_blockState = Trix::STARTED;
  178.   }
  179. }
  180. /**
  181.  *
  182.  */
  183. void Trix::execREAD_NODESREF(Signal* signal)
  184. {
  185.   // NYI
  186. }
  187. /**
  188.  *
  189.  */
  190. void Trix::execNODE_FAILREP(Signal* signal)
  191. {
  192.   jamEntry();
  193.   NodeFailRep * const  nodeFail = (NodeFailRep *) signal->getDataPtr();
  194.   //Uint32 failureNr    = nodeFail->failNo;
  195.   //Uint32 numberNodes  = nodeFail->noOfNodes;
  196.   Uint32 masterNodeId = nodeFail->masterNodeId;
  197.   NodeRecPtr nodeRecPtr;
  198.   for(c_theNodes.first(nodeRecPtr); 
  199.       nodeRecPtr.i != RNIL; 
  200.       c_theNodes.next(nodeRecPtr)) {
  201.     if(NodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
  202.       nodeRecPtr.p->alive = false;
  203.       c_noNodesFailed++;
  204.       c_noActiveNodes--;      
  205.     }
  206.   }
  207.   if (c_masterNodeId != masterNodeId) {
  208.     c_masterNodeId = masterNodeId;
  209.     NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
  210.     c_masterTrixRef = nodeRec->trixRef;
  211.   }
  212. }
  213. /**
  214.  *
  215.  */
  216. void Trix::execINCL_NODEREQ(Signal* signal)
  217. {
  218.   jamEntry();
  219.   UintR node_id = signal->theData[1];
  220.   NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
  221.   nodeRec->alive = true;
  222.   c_noNodesFailed--;
  223.   c_noActiveNodes++;      
  224.   nodeRec->trixRef = calcTrixBlockRef(node_id);
  225.   if (c_noNodesFailed == 0) {
  226.     c_blockState = Trix::STARTED;
  227.   }  
  228. }
  229. // Debugging
  230. void
  231. Trix::execDUMP_STATE_ORD(Signal* signal)
  232. {
  233.   jamEntry();
  234.   DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
  235.   switch(dumpStateOrd->args[0]) {
  236.   case(300): {// ok
  237.     // index2 -T; index2 -I -n10000; index2 -c
  238.     // all dump 300 0 0 0 0 0 4 2
  239.     // select_count INDEX0000
  240.     BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
  241.     
  242.     MEMCOPY_NO_WORDS(buildIndxReq, 
  243.      signal->theData + 1, 
  244.      BuildIndxReq::SignalLength);
  245.     buildIndxReq->setUserRef(reference()); // return to me
  246.     buildIndxReq->setParallelism(10);
  247.     Uint32 indexColumns[1] = {1};
  248.     Uint32 keyColumns[1] = {0};
  249.     struct LinearSectionPtr orderPtr[2];
  250.     buildIndxReq->setColumnOrder(indexColumns, 1, keyColumns, 1, orderPtr);
  251.     sendSignal(reference(),
  252.        GSN_BUILDINDXREQ,
  253.        signal,
  254.        BuildIndxReq::SignalLength,
  255.        JBB,
  256.        orderPtr,
  257.        BuildIndxReq::NoOfSections);
  258.     break;
  259.   }
  260.   case(301): { // ok
  261.     // index2 -T; index2 -I -n10000; index2 -c -p
  262.     // all dump 301 0 0 0 0 0 4 2
  263.     // select_count INDEX0000
  264.     BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
  265.     
  266.     MEMCOPY_NO_WORDS(buildIndxReq, 
  267.      signal->theData + 1, 
  268.      BuildIndxReq::SignalLength);
  269.     buildIndxReq->setUserRef(reference()); // return to me
  270.     buildIndxReq->setParallelism(10);
  271.     Uint32 indexColumns[2] = {0, 1};
  272.     Uint32 keyColumns[1] = {0};
  273.     struct LinearSectionPtr orderPtr[2];
  274.     buildIndxReq->setColumnOrder(indexColumns, 2, keyColumns, 1, orderPtr);
  275.     sendSignal(reference(),
  276.        GSN_BUILDINDXREQ,
  277.        signal,
  278.        BuildIndxReq::SignalLength,
  279.        JBB,
  280.        orderPtr,
  281.        BuildIndxReq::NoOfSections);
  282.     break;
  283.   }
  284.   case(302): { // ok
  285.     // index -T; index -I -n1000; index -c -p
  286.     // all dump 302 0 0 0 0 0 4 2
  287.     // select_count PNUMINDEX0000
  288.     BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
  289.     
  290.     MEMCOPY_NO_WORDS(buildIndxReq, 
  291.      signal->theData + 1, 
  292.      BuildIndxReq::SignalLength);
  293.     buildIndxReq->setUserRef(reference()); // return to me
  294.     buildIndxReq->setParallelism(10);
  295.     Uint32 indexColumns[3] = {0, 3, 5};
  296.     Uint32 keyColumns[1] = {0};
  297.     struct LinearSectionPtr orderPtr[2];
  298.     buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
  299.     sendSignal(reference(),
  300.        GSN_BUILDINDXREQ,
  301.        signal,
  302.        BuildIndxReq::SignalLength,
  303.        JBB,
  304.        orderPtr,
  305.        BuildIndxReq::NoOfSections);
  306.     break;
  307.   }
  308.   case(303): { // ok
  309.     // index -T -2; index -I -2 -n1000; index -c -p
  310.     // all dump 303 0 0 0 0 0 4 2
  311.     // select_count PNUMINDEX0000
  312.     BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
  313.     
  314.     MEMCOPY_NO_WORDS(buildIndxReq, 
  315.      signal->theData + 1, 
  316.      BuildIndxReq::SignalLength);
  317.     buildIndxReq->setUserRef(reference()); // return to me
  318.     buildIndxReq->setParallelism(10);
  319.     Uint32 indexColumns[3] = {0, 3, 5};
  320.     Uint32 keyColumns[2] = {0, 1};
  321.     struct LinearSectionPtr orderPtr[2];
  322.     buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
  323.     sendSignal(reference(),
  324.        GSN_BUILDINDXREQ,
  325.        signal,
  326.        BuildIndxReq::SignalLength,
  327.        JBB,
  328.        orderPtr,
  329.        BuildIndxReq::NoOfSections);
  330.     break;
  331.   }
  332.   case(304): { // ok
  333.     // index -T -L; index -I -L -n1000; index -c -p
  334.     // all dump 304 0 0 0 0 0 4 2
  335.     // select_count PNUMINDEX0000
  336.     BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
  337.     
  338.     MEMCOPY_NO_WORDS(buildIndxReq, 
  339.      signal->theData + 1, 
  340.      BuildIndxReq::SignalLength);
  341.     buildIndxReq->setUserRef(reference()); // return to me
  342.     buildIndxReq->setParallelism(10);
  343.     Uint32 indexColumns[3] = {0, 3, 5};
  344.     Uint32 keyColumns[1] = {0};
  345.     struct LinearSectionPtr orderPtr[2];
  346.     buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
  347.     sendSignal(reference(),
  348.        GSN_BUILDINDXREQ,
  349.        signal,
  350.        BuildIndxReq::SignalLength,
  351.        JBB,
  352.        orderPtr,
  353.        BuildIndxReq::NoOfSections);
  354.     break;
  355.   }
  356.   case(305): { // ok
  357.     // index -T -2 -L; index -I -2 -L -n1000; index -c -p
  358.     // all dump 305 0 0 0 0 0 4 2
  359.     // select_count PNUMINDEX0000
  360.     BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
  361.     
  362.     MEMCOPY_NO_WORDS(buildIndxReq, 
  363.      signal->theData + 1, 
  364.      BuildIndxReq::SignalLength);
  365.     buildIndxReq->setUserRef(reference()); // return to me
  366.     buildIndxReq->setParallelism(10);
  367.     Uint32 indexColumns[3] = {0, 3, 5};
  368.     Uint32 keyColumns[2] = {0, 1};
  369.     struct LinearSectionPtr orderPtr[2];
  370.     buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
  371.     sendSignal(reference(),
  372.        GSN_BUILDINDXREQ,
  373.        signal,
  374.        BuildIndxReq::SignalLength,
  375.        JBB,
  376.        orderPtr,
  377.        BuildIndxReq::NoOfSections);
  378.     break;
  379.   }
  380.   default: {
  381.     // Ignore
  382.   }
  383.   }
  384. }
  385. // Build index
  386. /**
  387.  *
  388.  */
  389. void Trix:: execBUILDINDXREQ(Signal* signal)
  390. {
  391.   jamEntry();
  392.   BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr();
  393.   // Seize a subscription record
  394.   SubscriptionRecPtr subRecPtr;
  395.   SubscriptionRecord* subRec;
  396.   
  397.   if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->getBuildId())) {
  398.     // Failed to allocate subscription record
  399.     BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
  400.     buildIndxRef->setErrorCode(BuildIndxRef::AllocationFailure);
  401.     releaseSections(signal);
  402.     sendSignal(buildIndxReq->getUserRef(), 
  403.        GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB);
  404.     return;
  405.   }
  406.   subRec = subRecPtr.p;
  407.   subRec->errorCode = BuildIndxRef::NoError;
  408.   subRec->userReference = buildIndxReq->getUserRef();
  409.   subRec->connectionPtr = buildIndxReq->getConnectionPtr();
  410.   subRec->subscriptionId = buildIndxReq->getBuildId();
  411.   subRec->subscriptionKey = buildIndxReq->getBuildKey();
  412.   subRec->indexType = buildIndxReq->getIndexType();
  413.   subRec->sourceTableId = buildIndxReq->getTableId();
  414.   subRec->targetTableId = buildIndxReq->getIndexId();
  415.   subRec->parallelism = buildIndxReq->getParallelism();
  416.   subRec->expectedConf = 0;
  417.   subRec->subscriptionCreated = false;
  418.   subRec->pendingSubSyncContinueConf = false;
  419.   subRec->prepareId = RNIL;
  420.   // Get column order segments
  421.   Uint32 noOfSections = signal->getNoOfSections();
  422.   if(noOfSections > 0) {
  423.     SegmentedSectionPtr ptr;
  424.     signal->getSection(ptr, BuildIndxReq::INDEX_COLUMNS);
  425.     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
  426.     subRec->noOfIndexColumns = ptr.sz;
  427.   }
  428.   if(noOfSections > 1) {
  429.     SegmentedSectionPtr ptr;
  430.     signal->getSection(ptr, BuildIndxReq::KEY_COLUMNS);
  431.     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
  432.     subRec->noOfKeyColumns = ptr.sz;
  433.   }
  434. #if 0
  435.   // Debugging
  436.   printf("Trix:: execBUILDINDXREQ: Attribute order:n");
  437.   subRec->attributeOrder.print(stdout);
  438. #endif
  439.   releaseSections(signal);
  440.   prepareInsertTransactions(signal, subRecPtr);
  441. }
  442. void Trix:: execBUILDINDXCONF(Signal* signal)
  443. {
  444.   printf("Trix:: execBUILDINDXCONFn");
  445. }
  446. void Trix:: execBUILDINDXREF(Signal* signal)
  447. {
  448.   printf("Trix:: execBUILDINDXREFn");
  449. }
  450. void Trix::execUTIL_PREPARE_CONF(Signal* signal)
  451. {
  452.   jamEntry();
  453.   UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
  454.   SubscriptionRecPtr subRecPtr;
  455.   SubscriptionRecord* subRec;
  456.   subRecPtr.i = utilPrepareConf->senderData;
  457.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  458.     printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %un", subRecPtr.i);
  459.     return;
  460.   }
  461.   subRecPtr.p = subRec;
  462.   subRec->prepareId = utilPrepareConf->prepareId;
  463.   setupSubscription(signal, subRecPtr);
  464. }
  465. void Trix::execUTIL_PREPARE_REF(Signal* signal)
  466. {
  467.   jamEntry();
  468.   UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
  469.   SubscriptionRecPtr subRecPtr;
  470.   SubscriptionRecord* subRec;
  471.   subRecPtr.i = utilPrepareRef->senderData;
  472.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  473.     printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %un", subRecPtr.i);
  474.     return;
  475.   }
  476.   subRecPtr.p = subRec;
  477.   subRec->errorCode = BuildIndxRef::InternalError;
  478. }
  479. void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
  480. {
  481.   jamEntry();
  482.   UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
  483.   SubscriptionRecPtr subRecPtr;
  484.   SubscriptionRecord* subRec;
  485.   subRecPtr.i = utilExecuteConf->senderData;
  486.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  487.     printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %un", subRecPtr.i);
  488.     return;
  489.   }
  490.   subRecPtr.p = subRec;
  491.   subRec->expectedConf--;
  492.   checkParallelism(signal, subRec);
  493.   if (subRec->expectedConf == 0)
  494.     buildComplete(signal, subRecPtr);
  495. }
  496. void Trix::execUTIL_EXECUTE_REF(Signal* signal)
  497. {
  498.   jamEntry();
  499.   UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
  500.   SubscriptionRecPtr subRecPtr;
  501.   SubscriptionRecord* subRec;
  502.   subRecPtr.i = utilExecuteRef->senderData;
  503.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  504.     printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %un", subRecPtr.i);
  505.     return;
  506.   }
  507.   subRecPtr.p = subRec;
  508.   ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
  509.   if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
  510.     buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
  511.   else
  512.     buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
  513. }
  514. void Trix::execSUB_CREATE_CONF(Signal* signal)
  515. {
  516.   jamEntry();
  517.   SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
  518.   SubscriptionRecPtr subRecPtr;
  519.   SubscriptionRecord* subRec;
  520.   subRecPtr.i = subCreateConf->subscriberData;
  521.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  522.     printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %un", subRecPtr.i);
  523.     return;
  524.   }
  525.   ndbrequire(subRec->subscriptionId == subCreateConf->subscriptionId);
  526.   ndbrequire(subRec->subscriptionKey == subCreateConf->subscriptionKey);
  527.   subRec->subscriptionCreated = true;
  528.   subRecPtr.p = subRec;
  529.   setupTableScan(signal, subRecPtr);
  530. }
  531. void Trix::execSUB_CREATE_REF(Signal* signal)
  532. {
  533.   jamEntry();
  534.   // THIS SIGNAL IS NEVER SENT FROM SUMA?
  535.   /*
  536.   SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
  537.   SubscriptionRecPtr subRecPtr;
  538.   SubscriptionRecord* subRec;
  539.   subRecPtr.i = subCreateRef->subscriberData;
  540.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  541.     printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %un", subRecPtr.i);
  542.     return;
  543.   }
  544.   subRecPtr.p = subRec;
  545.   buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
  546.   */
  547. }
  548. void Trix::execSUB_SYNC_CONF(Signal* signal)
  549. {
  550.   jamEntry();
  551.   SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
  552.   SubscriptionRecPtr subRecPtr;
  553.   SubscriptionRecord* subRec;
  554.   
  555.   subRecPtr.i = subSyncConf->subscriberData;
  556.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  557.     printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %un", subRecPtr.i);
  558.     return;
  559.   }
  560.   ndbrequire(subRec->subscriptionId == subSyncConf->subscriptionId);
  561.   ndbrequire(subRec->subscriptionKey == subSyncConf->subscriptionKey);
  562.   subRecPtr.p = subRec;
  563.   if(subSyncConf->part == SubscriptionData::MetaData)
  564.     startTableScan(signal, subRecPtr);
  565.   else {
  566.     subRec->expectedConf--;
  567.     checkParallelism(signal, subRec);
  568.     if (subRec->expectedConf == 0)
  569.       buildComplete(signal, subRecPtr);
  570.   }
  571. }
  572. void Trix::execSUB_SYNC_REF(Signal* signal)
  573. {
  574.   jamEntry();
  575.   SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
  576.   SubscriptionRecPtr subRecPtr;
  577.   SubscriptionRecord* subRec;
  578.   subRecPtr.i = subSyncRef->subscriberData;
  579.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  580.     printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %un", subRecPtr.i);
  581.     return;
  582.   }
  583.   subRecPtr.p = subRec;
  584.   buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
  585. }
  586. void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
  587. {
  588.   SubSyncContinueReq  * subSyncContinueReq = 
  589.     (SubSyncContinueReq *) signal->getDataPtr();
  590.   
  591.   SubscriptionRecPtr subRecPtr;
  592.   SubscriptionRecord* subRec;
  593.   subRecPtr.i = subSyncContinueReq->subscriberData;
  594.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  595.     printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %un", subRecPtr.i);
  596.     return;
  597.   }
  598.   subRecPtr.p = subRec;
  599.   subRec->pendingSubSyncContinueConf = true;
  600.   checkParallelism(signal, subRec);
  601. }
  602. void Trix::execSUB_META_DATA(Signal* signal)
  603. {
  604.   jamEntry();
  605. }
  606. void Trix::execSUB_TABLE_DATA(Signal* signal)
  607. {
  608.   jamEntry();
  609.   SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
  610.   SubscriptionRecPtr subRecPtr;
  611.   SubscriptionRecord* subRec;
  612.   subRecPtr.i = subTableData->subscriberData;
  613.   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
  614.     printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %un", subRecPtr.i);
  615.     return;
  616.   }
  617.   subRecPtr.p = subRec;
  618.   SegmentedSectionPtr headerPtr, dataPtr;
  619.   if (!signal->getSection(headerPtr, 0)) {
  620.     printf("Trix::execSUB_TABLE_DATA: Failed to get header sectionn");
  621.   }
  622.   if (!signal->getSection(dataPtr, 1)) {
  623.     printf("Trix::execSUB_TABLE_DATA: Failed to get data sectionn");
  624.   }
  625.   executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr);
  626. }
  627. void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
  628. {
  629.   Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
  630.   SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
  631.   SubscriptionRecord* subRec = subRecPtr.p;
  632. //  Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
  633.   AttrOrderBuffer::DataBufferIterator iter;
  634.   Uint32 i = 0;
  635.   jam();
  636.   bool moreAttributes = subRec->attributeOrder.first(iter);
  637.   while (moreAttributes) {
  638.     attributeList[i++] = *iter.data;
  639.     moreAttributes = subRec->attributeOrder.next(iter);
  640.   }
  641.   // Merge index and key column segments
  642.   struct LinearSectionPtr orderPtr[3];
  643.   orderPtr[0].p = attributeList;
  644.   orderPtr[0].sz = subRec->attributeOrder.getSize();
  645.   subCreateReq->subscriberRef = reference();
  646.   subCreateReq->subscriberData = subRecPtr.i;
  647.   subCreateReq->subscriptionId = subRec->subscriptionId;
  648.   subCreateReq->subscriptionKey = subRec->subscriptionKey;
  649.   subCreateReq->tableId = subRec->sourceTableId;
  650.   subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
  651.   
  652.   sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, 
  653.      signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1);
  654. }
  655. void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
  656. {
  657.   SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
  658.   jam();
  659.   subSyncReq->subscriptionId = subRecPtr.i;
  660.   subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
  661.   subSyncReq->part = SubscriptionData::MetaData;
  662.   sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 
  663.      signal, SubSyncReq::SignalLength, JBB);
  664. }
  665. void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
  666. {
  667.   jam();
  668.   subRecPtr.p->expectedConf = 1;
  669.   SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
  670.   subSyncReq->subscriptionId = subRecPtr.i;
  671.   subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
  672.   subSyncReq->part = SubscriptionData::TableData;
  673.   sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 
  674.      signal, SubSyncReq::SignalLength, JBB);
  675. }
  676. void Trix::prepareInsertTransactions(Signal* signal, 
  677.      SubscriptionRecPtr subRecPtr)
  678. {
  679.   SubscriptionRecord* subRec = subRecPtr.p;
  680.   UtilPrepareReq * utilPrepareReq = 
  681.     (UtilPrepareReq *)signal->getDataPtrSend();
  682.   
  683.   jam();
  684.   utilPrepareReq->senderRef = reference();
  685.   utilPrepareReq->senderData = subRecPtr.i;
  686.   const Uint32 pageSizeInWords = 128;
  687.   Uint32 propPage[pageSizeInWords];
  688.   LinearWriter w(&propPage[0],128);
  689.   w.first();
  690.   w.add(UtilPrepareReq::NoOfOperations, 1);
  691.   w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
  692.   w.add(UtilPrepareReq::TableId, subRec->targetTableId);
  693.   // Add index attributes in increasing order and one PK attribute
  694.   for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
  695.     w.add(UtilPrepareReq::AttributeId, i);
  696. #if 0
  697.   // Debugging
  698.   SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
  699.   printf("Trix::prepareInsertTransactions: Sent SimpleProperties:n");
  700.   reader.printAll(ndbout);
  701. #endif
  702.   struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
  703.   sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
  704.   sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
  705.   sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
  706.      UtilPrepareReq::SignalLength, JBB, 
  707.      sectionsPtr, UtilPrepareReq::NoOfSections);
  708. }
  709. void Trix::executeInsertTransaction(Signal* signal, 
  710.     SubscriptionRecPtr subRecPtr,
  711.     SegmentedSectionPtr headerPtr,
  712.     SegmentedSectionPtr dataPtr)
  713. {
  714.   jam();
  715.   SubscriptionRecord* subRec = subRecPtr.p;
  716.   UtilExecuteReq * utilExecuteReq = 
  717.     (UtilExecuteReq *)signal->getDataPtrSend();
  718.   Uint32* headerBuffer = signal->theData + 25;
  719.   Uint32*  dataBuffer = headerBuffer + headerPtr.sz;
  720.   utilExecuteReq->senderRef = reference();
  721.   utilExecuteReq->senderData = subRecPtr.i;
  722.   utilExecuteReq->prepareId = subRec->prepareId;
  723. #if 0
  724.   printf("Header size %un", headerPtr.sz);
  725.   for(int i = 0; i < headerPtr.sz; i++)
  726.     printf("H'%.8x ", headerBuffer[i]);
  727.   printf("n");
  728.   
  729.   printf("Data size %un", dataPtr.sz);
  730.   for(int i = 0; i < dataPtr.sz; i++)
  731.     printf("H'%.8x ", dataBuffer[i]);
  732.   printf("n");
  733. #endif
  734.   // Save scan result in linear buffers
  735.   copy(headerBuffer, headerPtr);
  736.   copy(dataBuffer, dataPtr);
  737.   // Calculate packed key size
  738.   Uint32 noOfKeyData = 0;
  739.   for(Uint32 i = 0; i < headerPtr.sz; i++) {
  740.     AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
  741.     // Filter out NULL attributes
  742.     if (keyAttrHead->isNULL())
  743.       return;
  744.     if (i < subRec->noOfIndexColumns)
  745.       // Renumber index attributes in consequtive order
  746.       keyAttrHead->setAttributeId(i);
  747.     else
  748.       // Calculate total size of PK attribute
  749.       noOfKeyData += keyAttrHead->getDataSize();
  750.   }
  751.   // Increase expected CONF count
  752.   subRec->expectedConf++;
  753.   // Pack key attributes
  754.   AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
  755. subRec->noOfIndexColumns,
  756. noOfKeyData);
  757.   struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
  758.   sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
  759.   sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = 
  760.     subRec->noOfIndexColumns + 1;
  761.   sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
  762.   sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
  763.   sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
  764.      UtilExecuteReq::SignalLength, JBB,
  765.      sectionsPtr, UtilExecuteReq::NoOfSections);
  766. }
  767. void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
  768. {
  769.   SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
  770.   req->senderRef       = reference();
  771.   req->senderData      = subRecPtr.i;
  772.   req->subscriptionId  = subRecPtr.p->subscriptionId;
  773.   req->subscriptionKey = subRecPtr.p->subscriptionKey;
  774.   sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
  775.      SubRemoveReq::SignalLength, JBB);
  776. }
  777. void Trix::buildFailed(Signal* signal, 
  778.        SubscriptionRecPtr subRecPtr, 
  779.        BuildIndxRef::ErrorCode errorCode)
  780. {
  781.   SubscriptionRecord* subRec = subRecPtr.p;
  782.   subRec->errorCode = errorCode;
  783.   // Continue accumulating since we currently cannot stop SUMA
  784.   subRec->expectedConf--;
  785.   checkParallelism(signal, subRec);
  786.   if (subRec->expectedConf == 0)
  787.     buildComplete(signal, subRecPtr);
  788. }
  789. void
  790. Trix::execSUB_REMOVE_REF(Signal* signal){
  791.   jamEntry();
  792.   //@todo
  793.   ndbrequire(false);
  794. }
  795. void
  796. Trix::execSUB_REMOVE_CONF(Signal* signal){
  797.   jamEntry();
  798.   SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
  799.   SubscriptionRecPtr subRecPtr;
  800.   c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
  801.   if(subRecPtr.p->prepareId != RNIL){
  802.     jam();
  803.     UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
  804.     req->prepareId = subRecPtr.p->prepareId;
  805.     req->senderData = subRecPtr.i;
  806.     sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal, 
  807.        UtilReleaseReq::SignalLength , JBB);  
  808.     return;
  809.   }
  810.   
  811.   {
  812.     UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
  813.     conf->senderData = subRecPtr.i;
  814.     execUTIL_RELEASE_CONF(signal);
  815.   }
  816. }
  817. void
  818. Trix::execUTIL_RELEASE_REF(Signal* signal){
  819.   jamEntry();
  820.   ndbrequire(false);
  821. }
  822. void
  823. Trix::execUTIL_RELEASE_CONF(Signal* signal){
  824.   
  825.   UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
  826.   
  827.   SubscriptionRecPtr subRecPtr;
  828.   c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
  829.   
  830.   if(subRecPtr.p->errorCode == BuildIndxRef::NoError){
  831.     // Build is complete, reply to original sender
  832.     BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend();
  833.     buildIndxConf->setUserRef(subRecPtr.p->userReference);
  834.     buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr);
  835.     buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX);
  836.     buildIndxConf->setIndexType(subRecPtr.p->indexType);
  837.     buildIndxConf->setTableId(subRecPtr.p->sourceTableId);
  838.     buildIndxConf->setIndexId(subRecPtr.p->targetTableId);
  839.     
  840.     sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal, 
  841.        BuildIndxConf::SignalLength , JBB);  
  842.   } else {
  843.     // Build failed, reply to original sender
  844.     BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
  845.     buildIndxRef->setUserRef(subRecPtr.p->userReference);
  846.     buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr);
  847.     buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX);
  848.     buildIndxRef->setIndexType(subRecPtr.p->indexType);
  849.     buildIndxRef->setTableId(subRecPtr.p->sourceTableId);
  850.     buildIndxRef->setIndexId(subRecPtr.p->targetTableId);
  851.     buildIndxRef->setErrorCode(subRecPtr.p->errorCode);
  852.     
  853.     sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal, 
  854.        BuildIndxRef::SignalLength , JBB);  
  855.   }
  856.   // Release subscription record
  857.   subRecPtr.p->attributeOrder.release();
  858.   c_theSubscriptions.release(subRecPtr.i);
  859. }
  860. void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
  861. {
  862.   if ((subRec->pendingSubSyncContinueConf) &&
  863.       (subRec->expectedConf < subRec->parallelism)) {
  864.     SubSyncContinueConf  * subSyncContinueConf = 
  865.       (SubSyncContinueConf *) signal->getDataPtrSend();
  866.     subSyncContinueConf->subscriptionId = subRec->subscriptionId;
  867.     subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
  868.     sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 
  869.        SubSyncContinueConf::SignalLength , JBB);  
  870.     subRec->pendingSubSyncContinueConf = false;
  871.   }
  872. }
  873. BLOCK_FUNCTIONS(Trix)
  874. template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);