Trix.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:30k
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #include "Trix.hpp"
- #include <string.h>
- #include <kernel_types.h>
- #include <NdbOut.hpp>
- #include <signaldata/ReadNodesConf.hpp>
- #include <signaldata/NodeFailRep.hpp>
- #include <signaldata/DumpStateOrd.hpp>
- #include <signaldata/GetTabInfo.hpp>
- #include <signaldata/DictTabInfo.hpp>
- #include <signaldata/BuildIndx.hpp>
- #include <signaldata/SumaImpl.hpp>
- #include <signaldata/UtilPrepare.hpp>
- #include <signaldata/UtilExecute.hpp>
- #include <signaldata/UtilRelease.hpp>
- #include <SectionReader.hpp>
- #include <AttributeHeader.hpp>
- #define CONSTRAINT_VIOLATION 893
- #define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
- /**
- *
- */
- Trix::Trix(const Configuration & conf) :
- SimulatedBlock(TRIX, conf),
- c_theNodes(c_theNodeRecPool),
- c_masterNodeId(0),
- c_masterTrixRef(0),
- c_noNodesFailed(0),
- c_noActiveNodes(0),
- c_theSubscriptions(c_theSubscriptionRecPool)
- {
- BLOCK_CONSTRUCTOR(Trix);
- // Add received signals
- addRecSignal(GSN_STTOR, &Trix::execSTTOR);
- addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT
- addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
- addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
- addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
- addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
- addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
- // Index build
- addRecSignal(GSN_BUILDINDXREQ, &Trix::execBUILDINDXREQ);
- // Dump testing
- addRecSignal(GSN_BUILDINDXCONF, &Trix::execBUILDINDXCONF);
- addRecSignal(GSN_BUILDINDXREF, &Trix::execBUILDINDXREF);
- addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
- addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
- addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
- addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
- addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
- addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
- // Suma signals
- addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
- addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
- addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
- addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
- addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
- addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
- addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
- addRecSignal(GSN_SUB_META_DATA, &Trix::execSUB_META_DATA);
- addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
- // Allocate pool sizes
- c_theAttrOrderBufferPool.setSize(100);
- c_theSubscriptionRecPool.setSize(100);
- ArrayList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
- SubscriptionRecPtr subptr;
- while(subscriptions.seize(subptr) == true) {
- new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
- }
- subscriptions.release();
- }
- /**
- *
- */
- Trix::~Trix()
- {
- }
- /**
- *
- */
- void Trix::execSTTOR(Signal* signal)
- {
- jamEntry();
- //const Uint32 startphase = signal->theData[1];
- const Uint32 theSignalKey = signal->theData[6];
-
- signal->theData[0] = theSignalKey;
- signal->theData[3] = 1;
- signal->theData[4] = 255; // No more start phases from missra
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
- return;
- }//Trix::execSTTOR()
- /**
- *
- */
- void Trix::execNDB_STTOR(Signal* signal)
- {
- jamEntry();
- BlockReference ndbcntrRef = signal->theData[0];
- Uint16 startphase = signal->theData[2]; /* RESTART PHASE */
- Uint16 mynode = signal->theData[1];
- //Uint16 restarttype = signal->theData[3];
- //UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */
- //UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */
- switch (startphase) {
- case 3:
- jam();
- /* SYMBOLIC START PHASE 4 */
- /* ABSOLUTE PHASE 5 */
- /* REQUEST NODE IDENTITIES FROM DBDIH */
- signal->theData[0] = calcTrixBlockRef(mynode);
- sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
- return;
- break;
- case 6:
- break;
- default:
- break;
- }
- }
- /**
- *
- */
- void Trix::execREAD_NODESCONF(Signal* signal)
- {
- jamEntry();
- ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr();
- //Uint32 noOfNodes = readNodes->noOfNodes;
- NodeRecPtr nodeRecPtr;
- c_masterNodeId = readNodes->masterNodeId;
- c_masterTrixRef = RNIL;
- c_noNodesFailed = 0;
- for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
- jam();
- if(NodeBitmask::get(readNodes->allNodes, i)) {
- // Node is defined
- jam();
- ndbrequire(c_theNodes.seizeId(nodeRecPtr, i));
- nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
- if (i == c_masterNodeId) {
- c_masterTrixRef = nodeRecPtr.p->trixRef;
- }
- if(NodeBitmask::get(readNodes->inactiveNodes, i)){
- // Node is not active
- jam();
- /**-----------------------------------------------------------------
- * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
- * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
- * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE
- * NOT ALL NODES ARE ALIVE.
- *------------------------------------------------------------------*/
- arrGuard(c_noNodesFailed, MAX_NDB_NODES);
- nodeRecPtr.p->alive = false;
- c_noNodesFailed++;
- c_blockState = Trix::NODE_FAILURE;
- }
- else {
- // Node is active
- jam();
- c_noActiveNodes++;
- nodeRecPtr.p->alive = true;
- }
- }
- }
- if (c_noNodesFailed == 0) {
- c_blockState = Trix::STARTED;
- }
- }
- /**
- *
- */
- void Trix::execREAD_NODESREF(Signal* signal)
- {
- // NYI
- }
- /**
- *
- */
- void Trix::execNODE_FAILREP(Signal* signal)
- {
- jamEntry();
- NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr();
- //Uint32 failureNr = nodeFail->failNo;
- //Uint32 numberNodes = nodeFail->noOfNodes;
- Uint32 masterNodeId = nodeFail->masterNodeId;
- NodeRecPtr nodeRecPtr;
- for(c_theNodes.first(nodeRecPtr);
- nodeRecPtr.i != RNIL;
- c_theNodes.next(nodeRecPtr)) {
- if(NodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
- nodeRecPtr.p->alive = false;
- c_noNodesFailed++;
- c_noActiveNodes--;
- }
- }
- if (c_masterNodeId != masterNodeId) {
- c_masterNodeId = masterNodeId;
- NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
- c_masterTrixRef = nodeRec->trixRef;
- }
- }
- /**
- *
- */
- void Trix::execINCL_NODEREQ(Signal* signal)
- {
- jamEntry();
- UintR node_id = signal->theData[1];
- NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
- nodeRec->alive = true;
- c_noNodesFailed--;
- c_noActiveNodes++;
- nodeRec->trixRef = calcTrixBlockRef(node_id);
- if (c_noNodesFailed == 0) {
- c_blockState = Trix::STARTED;
- }
- }
- // Debugging
- void
- Trix::execDUMP_STATE_ORD(Signal* signal)
- {
- jamEntry();
- DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
- switch(dumpStateOrd->args[0]) {
- case(300): {// ok
- // index2 -T; index2 -I -n10000; index2 -c
- // all dump 300 0 0 0 0 0 4 2
- // select_count INDEX0000
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
-
- MEMCOPY_NO_WORDS(buildIndxReq,
- signal->theData + 1,
- BuildIndxReq::SignalLength);
- buildIndxReq->setUserRef(reference()); // return to me
- buildIndxReq->setParallelism(10);
- Uint32 indexColumns[1] = {1};
- Uint32 keyColumns[1] = {0};
- struct LinearSectionPtr orderPtr[2];
- buildIndxReq->setColumnOrder(indexColumns, 1, keyColumns, 1, orderPtr);
- sendSignal(reference(),
- GSN_BUILDINDXREQ,
- signal,
- BuildIndxReq::SignalLength,
- JBB,
- orderPtr,
- BuildIndxReq::NoOfSections);
- break;
- }
- case(301): { // ok
- // index2 -T; index2 -I -n10000; index2 -c -p
- // all dump 301 0 0 0 0 0 4 2
- // select_count INDEX0000
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
-
- MEMCOPY_NO_WORDS(buildIndxReq,
- signal->theData + 1,
- BuildIndxReq::SignalLength);
- buildIndxReq->setUserRef(reference()); // return to me
- buildIndxReq->setParallelism(10);
- Uint32 indexColumns[2] = {0, 1};
- Uint32 keyColumns[1] = {0};
- struct LinearSectionPtr orderPtr[2];
- buildIndxReq->setColumnOrder(indexColumns, 2, keyColumns, 1, orderPtr);
- sendSignal(reference(),
- GSN_BUILDINDXREQ,
- signal,
- BuildIndxReq::SignalLength,
- JBB,
- orderPtr,
- BuildIndxReq::NoOfSections);
- break;
- }
- case(302): { // ok
- // index -T; index -I -n1000; index -c -p
- // all dump 302 0 0 0 0 0 4 2
- // select_count PNUMINDEX0000
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
-
- MEMCOPY_NO_WORDS(buildIndxReq,
- signal->theData + 1,
- BuildIndxReq::SignalLength);
- buildIndxReq->setUserRef(reference()); // return to me
- buildIndxReq->setParallelism(10);
- Uint32 indexColumns[3] = {0, 3, 5};
- Uint32 keyColumns[1] = {0};
- struct LinearSectionPtr orderPtr[2];
- buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
- sendSignal(reference(),
- GSN_BUILDINDXREQ,
- signal,
- BuildIndxReq::SignalLength,
- JBB,
- orderPtr,
- BuildIndxReq::NoOfSections);
- break;
- }
- case(303): { // ok
- // index -T -2; index -I -2 -n1000; index -c -p
- // all dump 303 0 0 0 0 0 4 2
- // select_count PNUMINDEX0000
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
-
- MEMCOPY_NO_WORDS(buildIndxReq,
- signal->theData + 1,
- BuildIndxReq::SignalLength);
- buildIndxReq->setUserRef(reference()); // return to me
- buildIndxReq->setParallelism(10);
- Uint32 indexColumns[3] = {0, 3, 5};
- Uint32 keyColumns[2] = {0, 1};
- struct LinearSectionPtr orderPtr[2];
- buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
- sendSignal(reference(),
- GSN_BUILDINDXREQ,
- signal,
- BuildIndxReq::SignalLength,
- JBB,
- orderPtr,
- BuildIndxReq::NoOfSections);
- break;
- }
- case(304): { // ok
- // index -T -L; index -I -L -n1000; index -c -p
- // all dump 304 0 0 0 0 0 4 2
- // select_count PNUMINDEX0000
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
-
- MEMCOPY_NO_WORDS(buildIndxReq,
- signal->theData + 1,
- BuildIndxReq::SignalLength);
- buildIndxReq->setUserRef(reference()); // return to me
- buildIndxReq->setParallelism(10);
- Uint32 indexColumns[3] = {0, 3, 5};
- Uint32 keyColumns[1] = {0};
- struct LinearSectionPtr orderPtr[2];
- buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
- sendSignal(reference(),
- GSN_BUILDINDXREQ,
- signal,
- BuildIndxReq::SignalLength,
- JBB,
- orderPtr,
- BuildIndxReq::NoOfSections);
- break;
- }
- case(305): { // ok
- // index -T -2 -L; index -I -2 -L -n1000; index -c -p
- // all dump 305 0 0 0 0 0 4 2
- // select_count PNUMINDEX0000
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
-
- MEMCOPY_NO_WORDS(buildIndxReq,
- signal->theData + 1,
- BuildIndxReq::SignalLength);
- buildIndxReq->setUserRef(reference()); // return to me
- buildIndxReq->setParallelism(10);
- Uint32 indexColumns[3] = {0, 3, 5};
- Uint32 keyColumns[2] = {0, 1};
- struct LinearSectionPtr orderPtr[2];
- buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
- sendSignal(reference(),
- GSN_BUILDINDXREQ,
- signal,
- BuildIndxReq::SignalLength,
- JBB,
- orderPtr,
- BuildIndxReq::NoOfSections);
- break;
- }
- default: {
- // Ignore
- }
- }
- }
- // Build index
- /**
- *
- */
- void Trix:: execBUILDINDXREQ(Signal* signal)
- {
- jamEntry();
- BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr();
- // Seize a subscription record
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
-
- if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->getBuildId())) {
- // Failed to allocate subscription record
- BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
- buildIndxRef->setErrorCode(BuildIndxRef::AllocationFailure);
- releaseSections(signal);
- sendSignal(buildIndxReq->getUserRef(),
- GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB);
- return;
- }
- subRec = subRecPtr.p;
- subRec->errorCode = BuildIndxRef::NoError;
- subRec->userReference = buildIndxReq->getUserRef();
- subRec->connectionPtr = buildIndxReq->getConnectionPtr();
- subRec->subscriptionId = buildIndxReq->getBuildId();
- subRec->subscriptionKey = buildIndxReq->getBuildKey();
- subRec->indexType = buildIndxReq->getIndexType();
- subRec->sourceTableId = buildIndxReq->getTableId();
- subRec->targetTableId = buildIndxReq->getIndexId();
- subRec->parallelism = buildIndxReq->getParallelism();
- subRec->expectedConf = 0;
- subRec->subscriptionCreated = false;
- subRec->pendingSubSyncContinueConf = false;
- subRec->prepareId = RNIL;
- // Get column order segments
- Uint32 noOfSections = signal->getNoOfSections();
- if(noOfSections > 0) {
- SegmentedSectionPtr ptr;
- signal->getSection(ptr, BuildIndxReq::INDEX_COLUMNS);
- append(subRec->attributeOrder, ptr, getSectionSegmentPool());
- subRec->noOfIndexColumns = ptr.sz;
- }
- if(noOfSections > 1) {
- SegmentedSectionPtr ptr;
- signal->getSection(ptr, BuildIndxReq::KEY_COLUMNS);
- append(subRec->attributeOrder, ptr, getSectionSegmentPool());
- subRec->noOfKeyColumns = ptr.sz;
- }
- #if 0
- // Debugging
- printf("Trix:: execBUILDINDXREQ: Attribute order:n");
- subRec->attributeOrder.print(stdout);
- #endif
- releaseSections(signal);
- prepareInsertTransactions(signal, subRecPtr);
- }
- void Trix:: execBUILDINDXCONF(Signal* signal)
- {
- printf("Trix:: execBUILDINDXCONFn");
- }
- void Trix:: execBUILDINDXREF(Signal* signal)
- {
- printf("Trix:: execBUILDINDXREFn");
- }
- void Trix::execUTIL_PREPARE_CONF(Signal* signal)
- {
- jamEntry();
- UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = utilPrepareConf->senderData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- subRec->prepareId = utilPrepareConf->prepareId;
- setupSubscription(signal, subRecPtr);
- }
- void Trix::execUTIL_PREPARE_REF(Signal* signal)
- {
- jamEntry();
- UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = utilPrepareRef->senderData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- subRec->errorCode = BuildIndxRef::InternalError;
- }
- void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
- {
- jamEntry();
- UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = utilExecuteConf->senderData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- subRec->expectedConf--;
- checkParallelism(signal, subRec);
- if (subRec->expectedConf == 0)
- buildComplete(signal, subRecPtr);
- }
- void Trix::execUTIL_EXECUTE_REF(Signal* signal)
- {
- jamEntry();
- UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = utilExecuteRef->senderData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
- if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
- buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
- else
- buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
- }
- void Trix::execSUB_CREATE_CONF(Signal* signal)
- {
- jamEntry();
- SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = subCreateConf->subscriberData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- ndbrequire(subRec->subscriptionId == subCreateConf->subscriptionId);
- ndbrequire(subRec->subscriptionKey == subCreateConf->subscriptionKey);
- subRec->subscriptionCreated = true;
- subRecPtr.p = subRec;
- setupTableScan(signal, subRecPtr);
- }
- void Trix::execSUB_CREATE_REF(Signal* signal)
- {
- jamEntry();
- // THIS SIGNAL IS NEVER SENT FROM SUMA?
- /*
- SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = subCreateRef->subscriberData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
- */
- }
- void Trix::execSUB_SYNC_CONF(Signal* signal)
- {
- jamEntry();
- SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
-
- subRecPtr.i = subSyncConf->subscriberData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- ndbrequire(subRec->subscriptionId == subSyncConf->subscriptionId);
- ndbrequire(subRec->subscriptionKey == subSyncConf->subscriptionKey);
- subRecPtr.p = subRec;
- if(subSyncConf->part == SubscriptionData::MetaData)
- startTableScan(signal, subRecPtr);
- else {
- subRec->expectedConf--;
- checkParallelism(signal, subRec);
- if (subRec->expectedConf == 0)
- buildComplete(signal, subRecPtr);
- }
- }
- void Trix::execSUB_SYNC_REF(Signal* signal)
- {
- jamEntry();
- SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = subSyncRef->subscriberData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
- }
- void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
- {
- SubSyncContinueReq * subSyncContinueReq =
- (SubSyncContinueReq *) signal->getDataPtr();
-
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = subSyncContinueReq->subscriberData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- subRec->pendingSubSyncContinueConf = true;
- checkParallelism(signal, subRec);
- }
- void Trix::execSUB_META_DATA(Signal* signal)
- {
- jamEntry();
- }
- void Trix::execSUB_TABLE_DATA(Signal* signal)
- {
- jamEntry();
- SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
- SubscriptionRecPtr subRecPtr;
- SubscriptionRecord* subRec;
- subRecPtr.i = subTableData->subscriberData;
- if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %un", subRecPtr.i);
- return;
- }
- subRecPtr.p = subRec;
- SegmentedSectionPtr headerPtr, dataPtr;
- if (!signal->getSection(headerPtr, 0)) {
- printf("Trix::execSUB_TABLE_DATA: Failed to get header sectionn");
- }
- if (!signal->getSection(dataPtr, 1)) {
- printf("Trix::execSUB_TABLE_DATA: Failed to get data sectionn");
- }
- executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr);
- }
- void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
- {
- Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
- SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
- SubscriptionRecord* subRec = subRecPtr.p;
- // Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
- AttrOrderBuffer::DataBufferIterator iter;
- Uint32 i = 0;
- jam();
- bool moreAttributes = subRec->attributeOrder.first(iter);
- while (moreAttributes) {
- attributeList[i++] = *iter.data;
- moreAttributes = subRec->attributeOrder.next(iter);
- }
- // Merge index and key column segments
- struct LinearSectionPtr orderPtr[3];
- orderPtr[0].p = attributeList;
- orderPtr[0].sz = subRec->attributeOrder.getSize();
- subCreateReq->subscriberRef = reference();
- subCreateReq->subscriberData = subRecPtr.i;
- subCreateReq->subscriptionId = subRec->subscriptionId;
- subCreateReq->subscriptionKey = subRec->subscriptionKey;
- subCreateReq->tableId = subRec->sourceTableId;
- subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
-
- sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
- signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1);
- }
- void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
- {
- SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
- jam();
- subSyncReq->subscriptionId = subRecPtr.i;
- subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
- subSyncReq->part = SubscriptionData::MetaData;
- sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
- signal, SubSyncReq::SignalLength, JBB);
- }
- void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
- {
- jam();
- subRecPtr.p->expectedConf = 1;
- SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
- subSyncReq->subscriptionId = subRecPtr.i;
- subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
- subSyncReq->part = SubscriptionData::TableData;
- sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
- signal, SubSyncReq::SignalLength, JBB);
- }
- void Trix::prepareInsertTransactions(Signal* signal,
- SubscriptionRecPtr subRecPtr)
- {
- SubscriptionRecord* subRec = subRecPtr.p;
- UtilPrepareReq * utilPrepareReq =
- (UtilPrepareReq *)signal->getDataPtrSend();
-
- jam();
- utilPrepareReq->senderRef = reference();
- utilPrepareReq->senderData = subRecPtr.i;
- const Uint32 pageSizeInWords = 128;
- Uint32 propPage[pageSizeInWords];
- LinearWriter w(&propPage[0],128);
- w.first();
- w.add(UtilPrepareReq::NoOfOperations, 1);
- w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
- w.add(UtilPrepareReq::TableId, subRec->targetTableId);
- // Add index attributes in increasing order and one PK attribute
- for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
- w.add(UtilPrepareReq::AttributeId, i);
- #if 0
- // Debugging
- SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
- printf("Trix::prepareInsertTransactions: Sent SimpleProperties:n");
- reader.printAll(ndbout);
- #endif
- struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
- sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
- sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
- sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
- UtilPrepareReq::SignalLength, JBB,
- sectionsPtr, UtilPrepareReq::NoOfSections);
- }
- void Trix::executeInsertTransaction(Signal* signal,
- SubscriptionRecPtr subRecPtr,
- SegmentedSectionPtr headerPtr,
- SegmentedSectionPtr dataPtr)
- {
- jam();
- SubscriptionRecord* subRec = subRecPtr.p;
- UtilExecuteReq * utilExecuteReq =
- (UtilExecuteReq *)signal->getDataPtrSend();
- Uint32* headerBuffer = signal->theData + 25;
- Uint32* dataBuffer = headerBuffer + headerPtr.sz;
- utilExecuteReq->senderRef = reference();
- utilExecuteReq->senderData = subRecPtr.i;
- utilExecuteReq->prepareId = subRec->prepareId;
- #if 0
- printf("Header size %un", headerPtr.sz);
- for(int i = 0; i < headerPtr.sz; i++)
- printf("H'%.8x ", headerBuffer[i]);
- printf("n");
-
- printf("Data size %un", dataPtr.sz);
- for(int i = 0; i < dataPtr.sz; i++)
- printf("H'%.8x ", dataBuffer[i]);
- printf("n");
- #endif
- // Save scan result in linear buffers
- copy(headerBuffer, headerPtr);
- copy(dataBuffer, dataPtr);
- // Calculate packed key size
- Uint32 noOfKeyData = 0;
- for(Uint32 i = 0; i < headerPtr.sz; i++) {
- AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
- // Filter out NULL attributes
- if (keyAttrHead->isNULL())
- return;
- if (i < subRec->noOfIndexColumns)
- // Renumber index attributes in consequtive order
- keyAttrHead->setAttributeId(i);
- else
- // Calculate total size of PK attribute
- noOfKeyData += keyAttrHead->getDataSize();
- }
- // Increase expected CONF count
- subRec->expectedConf++;
- // Pack key attributes
- AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
- subRec->noOfIndexColumns,
- noOfKeyData);
- struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
- sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
- sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
- subRec->noOfIndexColumns + 1;
- sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
- sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
- sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
- UtilExecuteReq::SignalLength, JBB,
- sectionsPtr, UtilExecuteReq::NoOfSections);
- }
- void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
- {
- SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
- req->senderRef = reference();
- req->senderData = subRecPtr.i;
- req->subscriptionId = subRecPtr.p->subscriptionId;
- req->subscriptionKey = subRecPtr.p->subscriptionKey;
- sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
- SubRemoveReq::SignalLength, JBB);
- }
- void Trix::buildFailed(Signal* signal,
- SubscriptionRecPtr subRecPtr,
- BuildIndxRef::ErrorCode errorCode)
- {
- SubscriptionRecord* subRec = subRecPtr.p;
- subRec->errorCode = errorCode;
- // Continue accumulating since we currently cannot stop SUMA
- subRec->expectedConf--;
- checkParallelism(signal, subRec);
- if (subRec->expectedConf == 0)
- buildComplete(signal, subRecPtr);
- }
- void
- Trix::execSUB_REMOVE_REF(Signal* signal){
- jamEntry();
- //@todo
- ndbrequire(false);
- }
- void
- Trix::execSUB_REMOVE_CONF(Signal* signal){
- jamEntry();
- SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
- SubscriptionRecPtr subRecPtr;
- c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
- if(subRecPtr.p->prepareId != RNIL){
- jam();
- UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
- req->prepareId = subRecPtr.p->prepareId;
- req->senderData = subRecPtr.i;
- sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
- UtilReleaseReq::SignalLength , JBB);
- return;
- }
-
- {
- UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
- conf->senderData = subRecPtr.i;
- execUTIL_RELEASE_CONF(signal);
- }
- }
- void
- Trix::execUTIL_RELEASE_REF(Signal* signal){
- jamEntry();
- ndbrequire(false);
- }
- void
- Trix::execUTIL_RELEASE_CONF(Signal* signal){
-
- UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
-
- SubscriptionRecPtr subRecPtr;
- c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
-
- if(subRecPtr.p->errorCode == BuildIndxRef::NoError){
- // Build is complete, reply to original sender
- BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend();
- buildIndxConf->setUserRef(subRecPtr.p->userReference);
- buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr);
- buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX);
- buildIndxConf->setIndexType(subRecPtr.p->indexType);
- buildIndxConf->setTableId(subRecPtr.p->sourceTableId);
- buildIndxConf->setIndexId(subRecPtr.p->targetTableId);
-
- sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal,
- BuildIndxConf::SignalLength , JBB);
- } else {
- // Build failed, reply to original sender
- BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
- buildIndxRef->setUserRef(subRecPtr.p->userReference);
- buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr);
- buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX);
- buildIndxRef->setIndexType(subRecPtr.p->indexType);
- buildIndxRef->setTableId(subRecPtr.p->sourceTableId);
- buildIndxRef->setIndexId(subRecPtr.p->targetTableId);
- buildIndxRef->setErrorCode(subRecPtr.p->errorCode);
-
- sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal,
- BuildIndxRef::SignalLength , JBB);
- }
- // Release subscription record
- subRecPtr.p->attributeOrder.release();
- c_theSubscriptions.release(subRecPtr.i);
- }
- void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
- {
- if ((subRec->pendingSubSyncContinueConf) &&
- (subRec->expectedConf < subRec->parallelism)) {
- SubSyncContinueConf * subSyncContinueConf =
- (SubSyncContinueConf *) signal->getDataPtrSend();
- subSyncContinueConf->subscriptionId = subRec->subscriptionId;
- subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
- sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
- SubSyncContinueConf::SignalLength , JBB);
- subRec->pendingSubSyncContinueConf = false;
- }
- }
- BLOCK_FUNCTIONS(Trix)
- template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);