UtilTransactions.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:35k
源码类别:
MySQL数据库
开发平台:
Visual C++
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #include "UtilTransactions.hpp"
- #include <NdbSleep.h>
- #include <NdbScanFilter.hpp>
- #define VERBOSE 0
- UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab):
- tab(_tab){
- m_defaultClearMethod = 3;
- }
- UtilTransactions::UtilTransactions(Ndb* ndb, const char * name) :
- tab(* ndb->getDictionary()->getTable(name)){
- m_defaultClearMethod = 3;
- }
- #define RESTART_SCAN 99
- #define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED)
- int
- UtilTransactions::clearTable(Ndb* pNdb,
- int records,
- int parallelism){
- if(m_defaultClearMethod == 1){
- return clearTable1(pNdb, records, parallelism);
- } else if(m_defaultClearMethod == 2){
- return clearTable2(pNdb, records, parallelism);
- } else {
- return clearTable3(pNdb, records, parallelism);
- }
- }
- int
- UtilTransactions::clearTable1(Ndb* pNdb,
- int records,
- int parallelism){
- #if 1
- return clearTable3(pNdb, records, 1);
- #else
- // Scan all records exclusive and delete
- // them one by one
- int retryAttempt = 0;
- const int retryMax = 100;
- int check;
- NdbConnection *pTrans;
- NdbOperation *pOp;
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: Has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- RETURN_FAIL(err);
- }
- pOp = pTrans->getNdbOperation(tab.getName());
- if (pOp == NULL) {
- NdbError err = pNdb->getNdbError();
- ERR(err);
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- check = pOp->openScanExclusive(parallelism);
- if( check == -1 ) {
- NdbError err = pNdb->getNdbError();
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- NdbError err = pNdb->getNdbError();
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- #if 0
- // It's not necessary to read and PK's
- // Information about the PK's are sent in
- // KEYINFO20 signals anyway and used by takeOverScan
- // Read the primary keys from this table
- for(int a=0; a<tab.getNoOfColumns(); a++){
- if (tab.getColumn(a)->getPrimaryKey()){
- if(pOp->getValue(tab.getColumn(a)->getName()) == NULL){
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- }
- }
- #endif
- check = pTrans->executeScan();
- if( check == -1 ) {
- NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- int eof;
- int rows = 0;
- eof = pTrans->nextScanResult();
- while(eof == 0){
- rows++;
- int res = takeOverAndDeleteRecord(pNdb, pOp);
- if(res == RESTART_SCAN){
- eof = -2;
- continue;
- }
- if (res != 0){
- NdbError err = pNdb->getNdbError(res);
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- eof = pTrans->nextScanResult();
- }
- if (eof == -1) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- // If error = 488 there should be no limit on number of retry attempts
- if (err.code != 488)
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- RETURN_FAIL(err);
- }
- if(eof == -2){
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- pNdb->closeTransaction(pTrans);
- g_info << rows << " deleted" << endl;
- return NDBT_OK;
- }
- return NDBT_FAILED;
- #endif
- }
- int
- UtilTransactions::clearTable2(Ndb* pNdb,
- int records,
- int parallelism){
- #if 1
- return clearTable3(pNdb, records, parallelism);
- #else
- // Scan all records exclusive and delete
- // them one by one
- int retryAttempt = 0;
- const int retryMax = 10;
- int deletedRows = 0;
- int check;
- NdbConnection *pTrans;
- NdbOperation *pOp;
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- return NDBT_FAILED;
- }
- pOp = pTrans->getNdbOperation(tab.getName());
- if (pOp == NULL) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- check = pOp->openScanExclusive(parallelism);
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- #if 0
- // It's not necessary to read any PK's
- // Information about the PK's are sent in
- // KEYINFO20 signals anyway and used by takeOverScan
- // Read the primary keys from this table
- for(int a=0; a<tab.getNoOfColumns(); a++){
- if (tab.getColumn(a)->getPrimaryKey()){
- if(pOp->getValue(tab.getColumn(a)->getName()) == NULL){
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return -1;
- }
- }
- }
- #endif
- check = pTrans->executeScan();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- int eof;
- NdbConnection* pDelTrans;
- while((eof = pTrans->nextScanResult(true)) == 0){
- pDelTrans = pNdb->startTransaction();
- if (pDelTrans == NULL) {
- const NdbError err = pNdb->getNdbError();
- #if 0
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- #endif
- ERR(err);
- pNdb->closeTransaction(pDelTrans);
- return NDBT_FAILED;
- }
- do {
- deletedRows++;
- if (addRowToDelete(pNdb, pDelTrans, pOp) != 0){
- pNdb->closeTransaction(pDelTrans);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- } while((eof = pTrans->nextScanResult(false)) == 0);
- check = pDelTrans->execute(Commit);
- if( check == -1 ) {
- const NdbError err = pDelTrans->getNdbError();
- ERR(err);
- pNdb->closeTransaction(pDelTrans);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pDelTrans);
- }
- if (eof == -1) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- // If error = 488 there should be no limit on number of retry attempts
- if (err.code != 488)
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pTrans);
- g_info << deletedRows << " rows deleted" << endl;
- return NDBT_OK;
- }
- return NDBT_FAILED;
- #endif
- }
- int
- UtilTransactions::clearTable3(Ndb* pNdb,
- int records,
- int parallelism){
- // Scan all records exclusive and delete
- // them one by one
- int retryAttempt = 0;
- const int retryMax = 10;
- int deletedRows = 0;
- int check;
- NdbConnection *pTrans;
- NdbScanOperation *pOp;
- NdbError err;
- int par = parallelism;
- while (true){
- restart:
- if (retryAttempt++ >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- continue;
- }
- goto failed;
- }
- pOp = pTrans->getNdbScanOperation(tab.getName());
- if (pOp == NULL) {
- err = pTrans->getNdbError();
- if(err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- par = 1;
- goto restart;
- }
- goto failed;
- }
- NdbResultSet * rs = pOp->readTuplesExclusive(par);
- if( rs == 0 ) {
- err = pTrans->getNdbError();
- goto failed;
- }
- if(pTrans->execute(NoCommit) != 0){
- err = pTrans->getNdbError();
- if(err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- continue;
- }
- goto failed;
- }
- while((check = rs->nextResult(true)) == 0){
- do {
- if (rs->deleteTuple() != 0){
- goto failed;
- }
- deletedRows++;
- } while((check = rs->nextResult(false)) == 0);
- if(check != -1){
- check = pTrans->execute(Commit);
- pTrans->restart();
- }
- err = pTrans->getNdbError();
- if(check == -1){
- if(err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- par = 1;
- goto restart;
- }
- goto failed;
- }
- }
- if(check == -1){
- err = pTrans->getNdbError();
- if(err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- par = 1;
- goto restart;
- }
- goto failed;
- }
- pNdb->closeTransaction(pTrans);
- return NDBT_OK;
- }
- return NDBT_FAILED;
- failed:
- if(pTrans != 0) pNdb->closeTransaction(pTrans);
- ERR(err);
- return (err.code != 0 ? err.code : NDBT_FAILED);
- }
- int
- UtilTransactions::copyTableData(Ndb* pNdb,
- const char* destName){
- // Scan all records and copy
- // them to destName table
- int retryAttempt = 0;
- const int retryMax = 10;
- int insertedRows = 0;
- int parallelism = 240;
- int check;
- NdbConnection *pTrans;
- NdbScanOperation *pOp;
- NDBT_ResultRow row(tab);
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- return NDBT_FAILED;
- }
- pOp = pTrans->getNdbScanOperation(tab.getName());
- if (pOp == NULL) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- NdbResultSet* rs = pOp->readTuples(NdbScanOperation::LM_Read,
- parallelism);
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- // Read all attributes
- for (int a = 0; a < tab.getNoOfColumns(); a++){
- if ((row.attributeStore(a) =
- pOp->getValue(tab.getColumn(a)->getName())) == 0) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- }
- check = pTrans->execute(NoCommit);
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- int eof;
- while((eof = rs->nextResult(true)) == 0){
- do {
- insertedRows++;
- if (addRowToInsert(pNdb, pTrans, row, destName) != 0){
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- } while((eof = rs->nextResult(false)) == 0);
- check = pTrans->execute(Commit);
- pTrans->restart();
- if( check == -1 ) {
- const NdbError err = pTrans->getNdbError();
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- }
- if (eof == -1) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- // If error = 488 there should be no limit on number of retry attempts
- if (err.code != 488)
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pTrans);
- g_info << insertedRows << " rows copied" << endl;
- return NDBT_OK;
- }
- return NDBT_FAILED;
- }
- int
- UtilTransactions::addRowToInsert(Ndb* pNdb,
- NdbConnection* pInsTrans,
- NDBT_ResultRow & row,
- const char *insertTabName){
- int check;
- NdbOperation* pInsOp;
- pInsOp = pInsTrans->getNdbOperation(insertTabName);
- if (pInsOp == NULL) {
- ERR(pInsTrans->getNdbError());
- return NDBT_FAILED;
- }
- check = pInsOp->insertTuple();
- if( check == -1 ) {
- ERR(pInsTrans->getNdbError());
- return NDBT_FAILED;
- }
- // Set all attributes
- for (int a = 0; a < tab.getNoOfColumns(); a++){
- NdbRecAttr* r = row.attributeStore(a);
- int sz = r->attrSize() * r->arraySize();
- if (pInsOp->setValue(tab.getColumn(a)->getName(),
- r->aRef(),
- sz) != 0) {
- ERR(pInsTrans->getNdbError());
- return NDBT_FAILED;
- }
- }
- return NDBT_OK;
- }
- int
- UtilTransactions::scanReadRecords(Ndb* pNdb,
- int parallelism,
- NdbOperation::LockMode lm,
- int records,
- int noAttribs,
- int *attrib_list,
- ReadCallBackFn* fn){
- int retryAttempt = 0;
- const int retryMax = 100;
- int check;
- NdbConnection *pTrans;
- NdbScanOperation *pOp;
- NDBT_ResultRow row(tab);
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- return NDBT_FAILED;
- }
- pOp = pTrans->getNdbScanOperation(tab.getName());
- if (pOp == NULL) {
- const NdbError err = pNdb->getNdbError();
- pNdb->closeTransaction(pTrans);
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- return NDBT_FAILED;
- }
- NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism);
- if( rs == 0 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- // Call getValue for all the attributes supplied in attrib_list
- // ************************************************
- for (int a = 0; a < noAttribs; a++){
- if (attrib_list[a] < tab.getNoOfColumns()){
- g_info << "getValue(" << attrib_list[a] << ")" << endl;
- if ((row.attributeStore(attrib_list[a]) =
- pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- }
- }
- // *************************************************
- check = pTrans->execute(NoCommit);
- if( check == -1 ) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- int eof;
- int rows = 0;
- while((eof = rs->nextResult()) == 0){
- rows++;
- // Call callback for each record returned
- if(fn != NULL)
- fn(&row);
- }
- if (eof == -1) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pTrans);
- g_info << rows << " rows have been read" << endl;
- if (records != 0 && rows != records){
- g_info << "Check expected number of records failed" << endl
- << " expected=" << records <<", " << endl
- << " read=" << rows << endl;
- return NDBT_FAILED;
- }
- return NDBT_OK;
- }
- return NDBT_FAILED;
- }
- int
- UtilTransactions::selectCount(Ndb* pNdb,
- int parallelism,
- int* count_rows,
- NdbOperation::LockMode lm,
- NdbConnection* pTrans){
- int retryAttempt = 0;
- const int retryMax = 100;
- int check;
- NdbScanOperation *pOp;
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- if(!pTrans)
- pTrans = pNdb->startTransaction();
- if(!pTrans)
- {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError)
- continue;
- return NDBT_FAILED;
- }
- pOp = pTrans->getNdbScanOperation(tab.getName());
- if (pOp == NULL) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- return NDBT_FAILED;
- }
- NdbResultSet * rs = pOp->readTuples(lm);
- if( rs == 0) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- return NDBT_FAILED;
- }
- if(0){
- NdbScanFilter sf(pOp);
- sf.begin(NdbScanFilter::OR);
- sf.eq(2, (Uint32)30);
- sf.end();
- } else {
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- return NDBT_FAILED;
- }
- }
- check = pTrans->execute(NoCommit);
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- return NDBT_FAILED;
- }
- int eof;
- int rows = 0;
- while((eof = rs->nextResult()) == 0){
- rows++;
- }
- if (eof == -1) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pTrans);
- pTrans = 0;
- if (count_rows != NULL){
- *count_rows = rows;
- }
- return NDBT_OK;
- }
- return NDBT_FAILED;
- }
- int
- UtilTransactions::verifyIndex(Ndb* pNdb,
- const char* indexName,
- int parallelism,
- bool transactional){
- const NdbDictionary::Index* pIndex
- = pNdb->getDictionary()->getIndex(indexName, tab.getName());
- if (pIndex == 0){
- ndbout << " Index " << indexName << " does not exist!" << endl;
- return NDBT_FAILED;
- }
- switch (pIndex->getType()){
- case NdbDictionary::Index::UniqueHashIndex:
- return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional);
- case NdbDictionary::Index::OrderedIndex:
- return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional);
- break;
- default:
- ndbout << "Unknown index type" << endl;
- break;
- }
- return NDBT_FAILED;
- }
- int
- UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
- const NdbDictionary::Index * pIndex,
- int parallelism,
- bool transactional){
- /**
- * Scan all rows in TABLE and for each found row make one read in
- * TABLE and one using INDEX_TABLE. Then compare the two returned
- * rows. They should be equal!
- *
- */
- if (scanAndCompareUniqueIndex(pNdb,
- pIndex,
- parallelism,
- transactional) != NDBT_OK){
- return NDBT_FAILED;
- }
- return NDBT_OK;
- }
- int
- UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
- const NdbDictionary::Index* pIndex,
- int parallelism,
- bool transactional){
- int retryAttempt = 0;
- const int retryMax = 100;
- int check;
- NdbConnection *pTrans;
- NdbScanOperation *pOp;
- NDBT_ResultRow row(tab);
- parallelism = 1;
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- return NDBT_FAILED;
- }
- pOp = pTrans->getNdbScanOperation(tab.getName());
- if (pOp == NULL) {
- const NdbError err = pNdb->getNdbError();
- pNdb->closeTransaction(pTrans);
- ERR(err);
- if (err.status == NdbError::TemporaryError){
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- return NDBT_FAILED;
- }
- NdbResultSet* rs;
- if(transactional){
- rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
- } else {
- rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallelism);
- }
- if( rs == 0 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- // Read all attributes
- for (int a = 0; a < tab.getNoOfColumns(); a++){
- if ((row.attributeStore(a) =
- pOp->getValue(tab.getColumn(a)->getName())) == 0) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- }
- check = pTrans->execute(NoCommit);
- if( check == -1 ) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- int eof;
- int rows = 0;
- while((eof = rs->nextResult()) == 0){
- rows++;
- // ndbout << row.c_str().c_str() << endl;
- if (readRowFromTableAndIndex(pNdb,
- pTrans,
- pIndex,
- row) != NDBT_OK){
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- }
- if (eof == -1) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- rows--;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pTrans);
- return NDBT_OK;
- }
- return NDBT_FAILED;
- }
- int
- UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
- NdbConnection* scanTrans,
- const NdbDictionary::Index* pIndex,
- NDBT_ResultRow& row ){
- NdbDictionary::Index::Type indexType= pIndex->getType();
- int retryAttempt = 0;
- const int retryMax = 100;
- int check, a;
- NdbConnection *pTrans1=NULL;
- NdbResultSet *cursor= NULL;
- NdbOperation *pOp;
- int return_code= NDBT_FAILED;
- // Allocate place to store the result
- NDBT_ResultRow tabRow(tab);
- NDBT_ResultRow indexRow(tab);
- const char * indexName = pIndex->getName();
- while (true){
- if(retryAttempt)
- ndbout_c("retryAttempt %d", retryAttempt);
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- goto close_all;
- }
- pTrans1 = pNdb->hupp(scanTrans); //startTransaction();
- if (pTrans1 == NULL) {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- if(err.code == 0){
- return_code = NDBT_OK;
- goto close_all;
- }
- ERR(err);
- goto close_all;
- }
- /**
- * Read the record from TABLE
- */
- pOp = pTrans1->getNdbOperation(tab.getName());
- if (pOp == NULL) {
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- check = pOp->readTuple();
- if( check == -1 ) {
- ERR(pTrans1->getNdbError());
- pNdb->closeTransaction(pTrans1);
- goto close_all;
- }
- // Define primary keys
- #if VERBOSE
- printf("PK: ");
- #endif
- for(a = 0; a<tab.getNoOfColumns(); a++){
- const NdbDictionary::Column* attr = tab.getColumn(a);
- if (attr->getPrimaryKey() == true){
- if (pOp->equal(attr->getName(), row.attributeStore(a)->aRef()) != 0){
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- #if VERBOSE
- printf("%s = %d: ", attr->getName(), row.attributeStore(a)->aRef());
- #endif
- }
- }
- #if VERBOSE
- printf("n");
- #endif
- // Read all attributes
- #if VERBOSE
- printf("Reading %u attributes: ", tab.getNoOfColumns());
- #endif
- for(a = 0; a<tab.getNoOfColumns(); a++){
- if((tabRow.attributeStore(a) =
- pOp->getValue(tab.getColumn(a)->getName())) == 0) {
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- #if VERBOSE
- printf("%s ", tab.getColumn(a)->getName());
- #endif
- }
- #if VERBOSE
- printf("n");
- #endif
- /**
- * Read the record from INDEX_TABLE
- */
- NdbIndexOperation* pIndexOp= NULL;
- NdbIndexScanOperation *pScanOp= NULL;
- NdbOperation *pIOp= 0;
- bool null_found= false;
- for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
- const NdbDictionary::Column * col = pIndex->getColumn(a);
- if (row.attributeStore(col->getName())->isNULL())
- {
- null_found= true;
- break;
- }
- }
- const char * tabName= tab.getName();
- if(!null_found)
- {
- if (indexType == NdbDictionary::Index::UniqueHashIndex) {
- pIOp= pIndexOp= pTrans1->getNdbIndexOperation(indexName, tabName);
- } else {
- pIOp= pScanOp= pTrans1->getNdbIndexScanOperation(indexName, tabName);
- }
- if (pIOp == NULL) {
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- {
- bool not_ok;
- if (pIndexOp) {
- not_ok = pIndexOp->readTuple() == -1;
- } else {
- not_ok = (cursor= pScanOp->readTuples()) == 0;
- }
- if( not_ok ) {
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- }
- // Define primary keys for index
- #if VERBOSE
- printf("SI: ");
- #endif
- for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
- const NdbDictionary::Column * col = pIndex->getColumn(a);
- int r;
- if ( !row.attributeStore(col->getName())->isNULL() ) {
- if(pIOp->equal(col->getName(),
- row.attributeStore(col->getName())->aRef()) != 0){
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- }
- #if VERBOSE
- printf("%s = %d: ", col->getName(), row.attributeStore(a)->aRef());
- #endif
- }
- #if VERBOSE
- printf("n");
- #endif
- // Read all attributes
- #if VERBOSE
- printf("Reading %u attributes: ", tab.getNoOfColumns());
- #endif
- for(a = 0; a<tab.getNoOfColumns(); a++){
- void* pCheck;
- pCheck= indexRow.attributeStore(a)=
- pIOp->getValue(tab.getColumn(a)->getName());
- if(pCheck == NULL) {
- ERR(pTrans1->getNdbError());
- goto close_all;
- }
- #if VERBOSE
- printf("%s ", tab.getColumn(a)->getName());
- #endif
- }
- }
- #if VERBOSE
- printf("n");
- #endif
- check = pTrans1->execute(Commit);
- if( check == -1 ) {
- const NdbError err = pTrans1->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans1);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ndbout << "Error when comparing records - normal op" << endl;
- ERR(err);
- ndbout << "row: " << row.c_str().c_str() << endl;
- goto close_all;
- }
- /**
- * Compare the two rows
- */
- if(!null_found){
- if (pScanOp) {
- if (cursor->nextResult() != 0){
- const NdbError err = pTrans1->getNdbError();
- ERR(err);
- ndbout << "Error when comparing records - index op next_result missing" << endl;
- ndbout << "row: " << row.c_str().c_str() << endl;
- goto close_all;
- }
- }
- if (!(tabRow.c_str() == indexRow.c_str())){
- ndbout << "Error when comapring records" << endl;
- ndbout << " tabRow: n" << tabRow.c_str().c_str() << endl;
- ndbout << " indexRow: n" << indexRow.c_str().c_str() << endl;
- goto close_all;
- }
- if (pScanOp) {
- if (cursor->nextResult() == 0){
- ndbout << "Error when comparing records - index op next_result to many" << endl;
- ndbout << "row: " << row.c_str().c_str() << endl;
- goto close_all;
- }
- }
- }
- return_code= NDBT_OK;
- goto close_all;
- }
- close_all:
- if (cursor)
- cursor->close();
- if (pTrans1)
- pNdb->closeTransaction(pTrans1);
- return return_code;
- }
- int
- UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
- const NdbDictionary::Index* pIndex,
- int parallelism,
- bool transactional){
- int retryAttempt = 0;
- const int retryMax = 100;
- int check;
- NdbConnection *pTrans;
- NdbScanOperation *pOp;
- NdbIndexScanOperation * iop = 0;
- NdbResultSet* cursor= 0;
- NDBT_ResultRow scanRow(tab);
- NDBT_ResultRow pkRow(tab);
- NDBT_ResultRow indexRow(tab);
- const char * indexName = pIndex->getName();
- int res;
- parallelism = 1;
- while (true){
- if (retryAttempt >= retryMax){
- g_info << "ERROR: has retried this operation " << retryAttempt
- << " times, failing!" << endl;
- return NDBT_FAILED;
- }
- pTrans = pNdb->startTransaction();
- if (pTrans == NULL) {
- const NdbError err = pNdb->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- return NDBT_FAILED;
- }
- pOp = pTrans->getNdbScanOperation(tab.getName());
- if (pOp == NULL) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- NdbResultSet*
- rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
- if( rs == 0 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- check = pOp->interpret_exit_ok();
- if( check == -1 ) {
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- if(get_values(pOp, scanRow))
- {
- abort();
- }
- check = pTrans->execute(NoCommit);
- if( check == -1 ) {
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- int eof;
- int rows = 0;
- while(check == 0 && (eof = rs->nextResult()) == 0){
- rows++;
- bool null_found= false;
- for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){
- const NdbDictionary::Column * col = pIndex->getColumn(a);
- if (scanRow.attributeStore(col->getName())->isNULL())
- {
- null_found= true;
- break;
- }
- }
- // Do pk lookup
- NdbOperation * pk = pTrans->getNdbOperation(tab.getName());
- if(!pk || pk->readTuple())
- goto error;
- if(equal(&tab, pk, scanRow) || get_values(pk, pkRow))
- goto error;
- if(!null_found)
- {
- if(!iop && (iop= pTrans->getNdbIndexScanOperation(indexName,
- tab.getName())))
- {
- cursor= iop->readTuples(NdbScanOperation::LM_CommittedRead,
- parallelism);
- iop->interpret_exit_ok();
- if(!cursor || get_values(iop, indexRow))
- goto error;
- }
- else if(!iop || iop->reset_bounds())
- {
- goto error;
- }
- if(equal(pIndex, iop, scanRow))
- goto error;
- }
- check = pTrans->execute(NoCommit);
- if(check)
- goto error;
- if(scanRow.c_str() != pkRow.c_str()){
- g_err << "Error when comapring records" << endl;
- g_err << " scanRow: n" << scanRow.c_str().c_str() << endl;
- g_err << " pkRow: n" << pkRow.c_str().c_str() << endl;
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- if(!null_found)
- {
- if((res= cursor->nextResult()) != 0){
- g_err << "Failed to find row using index: " << res << endl;
- ERR(pTrans->getNdbError());
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- if(scanRow.c_str() != indexRow.c_str()){
- g_err << "Error when comapring records" << endl;
- g_err << " scanRow: n" << scanRow.c_str().c_str() << endl;
- g_err << " indexRow: n" << indexRow.c_str().c_str() << endl;
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- if(cursor->nextResult() == 0){
- g_err << "Found extra row!!" << endl;
- g_err << " indexRow: n" << indexRow.c_str().c_str() << endl;
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- }
- }
- if (eof == -1 || check == -1) {
- error:
- const NdbError err = pTrans->getNdbError();
- if (err.status == NdbError::TemporaryError){
- ERR(err);
- iop = 0;
- pNdb->closeTransaction(pTrans);
- NdbSleep_MilliSleep(50);
- retryAttempt++;
- rows--;
- continue;
- }
- ERR(err);
- pNdb->closeTransaction(pTrans);
- return NDBT_FAILED;
- }
- pNdb->closeTransaction(pTrans);
- return NDBT_OK;
- }
- return NDBT_FAILED;
- }
- int
- UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst)
- {
- for (int a = 0; a < tab.getNoOfColumns(); a++){
- NdbRecAttr*& ref= dst.attributeStore(a);
- if ((ref= op->getValue(a)) == 0)
- {
- return NDBT_FAILED;
- }
- }
- return 0;
- }
- int
- UtilTransactions::equal(const NdbDictionary::Index* pIndex,
- NdbOperation* op, const NDBT_ResultRow& src)
- {
- for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){
- const NdbDictionary::Column * col = pIndex->getColumn(a);
- if(op->equal(col->getName(),
- src.attributeStore(col->getName())->aRef()) != 0){
- return NDBT_FAILED;
- }
- }
- return 0;
- }
- int
- UtilTransactions::equal(const NdbDictionary::Table* pTable,
- NdbOperation* op, const NDBT_ResultRow& src)
- {
- for(Uint32 a = 0; a<tab.getNoOfColumns(); a++){
- const NdbDictionary::Column* attr = tab.getColumn(a);
- if (attr->getPrimaryKey() == true){
- if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){
- return NDBT_FAILED;
- }
- }
- }
- return 0;
- }