Ndb.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:36k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. /*****************************************************************************
  14. Name:          Ndb.cpp
  15. ******************************************************************************/
  16. #include <ndb_global.h>
  17. #include "NdbApiSignal.hpp"
  18. #include "NdbImpl.hpp"
  19. #include <NdbOperation.hpp>
  20. #include <NdbConnection.hpp>
  21. #include <NdbEventOperation.hpp>
  22. #include <NdbRecAttr.hpp>
  23. #include <md5_hash.hpp>
  24. #include <NdbSleep.h>
  25. #include <NdbOut.hpp>
  26. #include <ndb_limits.h>
  27. #include "API.hpp"
  28. #include <NdbEnv.h>
  29. #include <BaseString.hpp>
  30. /****************************************************************************
  31. void connect();
  32. Connect to any node which has no connection at the moment.
  33. ****************************************************************************/
  34. NdbConnection* Ndb::doConnect(Uint32 tConNode) 
  35. {
  36.   Uint32        tNode;
  37.   Uint32        tAnyAlive = 0;
  38.   int TretCode;
  39.   if (tConNode != 0) {
  40.     TretCode = NDB_connect(tConNode);
  41.     if ((TretCode == 1) || (TretCode == 2)) {
  42. //****************************************************************************
  43. // We have connections now to the desired node. Return
  44. //****************************************************************************
  45.       return getConnectedNdbConnection(tConNode);
  46.     } else if (TretCode != 0) {
  47.       tAnyAlive = 1;
  48.     }//if
  49.   }//if
  50. //****************************************************************************
  51. // We will connect to any node. Make sure that we have connections to all
  52. // nodes.
  53. //****************************************************************************
  54.   if (theImpl->m_optimized_node_selection)
  55.   {
  56.     Ndb_cluster_connection_node_iter &node_iter= 
  57.       theImpl->m_node_iter;
  58.     theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
  59.     while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
  60.     {
  61.       TretCode= NDB_connect(tNode);
  62.       if ((TretCode == 1) ||
  63.   (TretCode == 2))
  64.       {
  65. //****************************************************************************
  66. // We have connections now to the desired node. Return
  67. //****************************************************************************
  68. return getConnectedNdbConnection(tNode);
  69.       } else if (TretCode != 0) {
  70. tAnyAlive= 1;
  71.       }//if
  72.     }
  73.   }
  74.   else // just do a regular round robin
  75.   {
  76.     Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
  77.     Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
  78.     UintR Tcount = 0;
  79.     do {
  80.       theCurrentConnectIndex++;
  81.       if (theCurrentConnectIndex >= tNoOfDbNodes)
  82. theCurrentConnectIndex = 0;
  83.       Tcount++;
  84.       tNode= theImpl->theDBnodes[theCurrentConnectIndex];
  85.       TretCode= NDB_connect(tNode);
  86.       if ((TretCode == 1) ||
  87.   (TretCode == 2))
  88.       {
  89. //****************************************************************************
  90. // We have connections now to the desired node. Return
  91. //****************************************************************************
  92. return getConnectedNdbConnection(tNode);
  93.       } else if (TretCode != 0) {
  94. tAnyAlive= 1;
  95.       }//if
  96.     } while (Tcount < tNoOfDbNodes);
  97.   }
  98. //****************************************************************************
  99. // We were unable to find a free connection. If no node alive we will report
  100. // error code for cluster failure otherwise connection failure.
  101. //****************************************************************************
  102.   if (tAnyAlive == 1) {
  103. #ifdef VM_TRACE
  104.     ndbout << "TretCode = " << TretCode << endl;
  105. #endif
  106.     theError.code = 4006;
  107.   } else {
  108.     theError.code = 4009;
  109.   }//if
  110.   return NULL;
  111. }
  112. int 
  113. Ndb::NDB_connect(Uint32 tNode) 
  114. {
  115. //****************************************************************************
  116. // We will perform seize of a transaction record in DBTC in the specified node.
  117. //***************************************************************************
  118.   
  119.   int          tReturnCode;
  120.   TransporterFacade *tp = TransporterFacade::instance();
  121.   bool nodeAvail = tp->get_node_alive(tNode);
  122.   if(nodeAvail == false){
  123.     return 0;
  124.   }
  125.   
  126.   NdbConnection * tConArray = theConnectionArray[tNode];
  127.   if (tConArray != NULL) {
  128.     return 2;
  129.   }
  130.   
  131.   NdbConnection * tNdbCon = getNdbCon(); // Get free connection object.
  132.   if (tNdbCon == NULL) {
  133.     return 4;
  134.   }//if
  135.   NdbApiSignal* tSignal = getSignal(); // Get signal object
  136.   if (tSignal == NULL) {
  137.     releaseNdbCon(tNdbCon);
  138.     return 4;
  139.   }//if
  140.   if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) {
  141.     releaseNdbCon(tNdbCon);
  142.     releaseSignal(tSignal);
  143.     return 4;
  144.   }//if
  145.   tSignal->setData(tNdbCon->ptr2int(), 1);
  146. //************************************************
  147. // Set connection pointer as NdbConnection object
  148. //************************************************
  149.   tSignal->setData(theMyRef, 2); // Set my block reference
  150.   tNdbCon->Status(NdbConnection::Connecting); // Set status to connecting
  151.   Uint32 nodeSequence;
  152.   { // send and receive signal
  153.     Guard guard(tp->theMutexPtr);
  154.     nodeSequence = tp->getNodeSequence(tNode);
  155.     bool node_is_alive = tp->get_node_alive(tNode);
  156.     if (node_is_alive) { 
  157.       tReturnCode = tp->sendSignal(tSignal, tNode);  
  158.       releaseSignal(tSignal); 
  159.       if (tReturnCode != -1) {
  160.         theImpl->theWaiter.m_node = tNode;  
  161.         theImpl->theWaiter.m_state = WAIT_TC_SEIZE;  
  162.         tReturnCode = receiveResponse(); 
  163.       }//if
  164.     } else {
  165.       releaseSignal(tSignal);
  166.       tReturnCode = -1;
  167.     }//if
  168.   }
  169.   if ((tReturnCode == 0) && (tNdbCon->Status() == NdbConnection::Connected)) {
  170.     //************************************************
  171.     // Send and receive was successful
  172.     //************************************************
  173.     NdbConnection* tPrevFirst = theConnectionArray[tNode];
  174.     tNdbCon->setConnectedNodeId(tNode, nodeSequence);
  175.     
  176.     tNdbCon->setMyBlockReference(theMyRef);
  177.     theConnectionArray[tNode] = tNdbCon;
  178.     tNdbCon->theNext = tPrevFirst;
  179.     return 1;
  180.   } else {
  181.     releaseNdbCon(tNdbCon);
  182. //****************************************************************************
  183. // Unsuccessful connect is indicated by 3.
  184. //****************************************************************************
  185.     return 3;
  186.   }//if
  187. }//Ndb::NDB_connect()
  188. NdbConnection *
  189. Ndb::getConnectedNdbConnection(Uint32 nodeId){
  190.   NdbConnection* next = theConnectionArray[nodeId];
  191.   theConnectionArray[nodeId] = next->theNext;
  192.   next->theNext = NULL;
  193.   return next;
  194. }//Ndb::getConnectedNdbConnection()
  195. /*****************************************************************************
  196. disconnect();
  197. Remark:        Disconnect all connections to the database. 
  198. *****************************************************************************/
  199. void 
  200. Ndb::doDisconnect()
  201. {
  202.   DBUG_ENTER("Ndb::doDisconnect");
  203.   NdbConnection* tNdbCon;
  204.   CHECK_STATUS_MACRO_VOID;
  205.   Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
  206.   Uint8 *theDBnodes= theImpl->theDBnodes;
  207.   DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes));
  208.   UintR i;
  209.   for (i = 0; i < tNoOfDbNodes; i++) {
  210.     Uint32 tNode = theDBnodes[i];
  211.     tNdbCon = theConnectionArray[tNode];
  212.     while (tNdbCon != NULL) {
  213.       NdbConnection* tmpNdbCon = tNdbCon;
  214.       tNdbCon = tNdbCon->theNext;
  215.       releaseConnectToNdb(tmpNdbCon);
  216.     }//while
  217.   }//for
  218.   tNdbCon = theTransactionList;
  219.   while (tNdbCon != NULL) {
  220.     NdbConnection* tmpNdbCon = tNdbCon;
  221.     tNdbCon = tNdbCon->theNext;
  222.     releaseConnectToNdb(tmpNdbCon);
  223.   }//while
  224.   DBUG_VOID_RETURN;
  225. }//Ndb::disconnect()
  226. /*****************************************************************************
  227. int waitUntilReady(int timeout);
  228. Return Value:   Returns 0 if the Ndb is ready within timeout seconds.
  229.                 Returns -1 otherwise.
  230. Remark:         Waits until a node has status != 0
  231. *****************************************************************************/ 
  232. int
  233. Ndb::waitUntilReady(int timeout)
  234. {
  235.   DBUG_ENTER("Ndb::waitUntilReady");
  236.   int secondsCounter = 0;
  237.   int milliCounter = 0;
  238.   int noChecksSinceFirstAliveFound = 0;
  239.   int id;
  240.   if (theInitState != Initialised) {
  241.     // Ndb::init is not called
  242.     theError.code = 4256;
  243.     DBUG_RETURN(-1);
  244.   }
  245.   while (theNode == 0) {
  246.     if (secondsCounter >= timeout)
  247.     {
  248.       theError.code = 4269;
  249.       DBUG_RETURN(-1);
  250.     }
  251.     NdbSleep_MilliSleep(100);
  252.     milliCounter += 100;
  253.     if (milliCounter >= 1000) {
  254.       secondsCounter++;
  255.       milliCounter = 0;
  256.     }//if
  257.   }
  258.   if (theImpl->m_ndb_cluster_connection.wait_until_ready
  259.       (timeout-secondsCounter,30) < 0)
  260.   {
  261.     theError.code = 4009;
  262.     DBUG_RETURN(-1);
  263.   }
  264.   DBUG_RETURN(0);
  265. }
  266. /*****************************************************************************
  267. NdbConnection* startTransaction();
  268. Return Value:   Returns a pointer to a connection object.
  269.                 Return NULL otherwise.
  270. Remark:         Start transaction. Synchronous.
  271. *****************************************************************************/ 
  272. NdbConnection* 
  273. Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen)
  274. {
  275.   DBUG_ENTER("Ndb::startTransaction");
  276.   if (theInitState == Initialised) {
  277.     theError.code = 0;
  278.     checkFailedNode();
  279.   /**
  280.    * If the user supplied key data
  281.    * We will make a qualified quess to which node is the primary for the
  282.    * the fragment and contact that node
  283.    */
  284.     Uint32 nodeId;
  285.     if(keyData != 0) {
  286.       nodeId = 0; // guess not supported
  287.       // nodeId = m_ndb_cluster_connection->guess_primary_node(keyData, keyLen);
  288.     } else {
  289.       nodeId = 0;
  290.     }//if
  291.     {
  292.       NdbConnection *trans= startTransactionLocal(aPriority, nodeId);
  293.       DBUG_PRINT("exit",("start trans: 0x%x transid: 0x%llx",
  294.  trans, trans ? trans->getTransactionId() : 0));
  295.       DBUG_RETURN(trans);
  296.     }
  297.   } else {
  298.     DBUG_RETURN(NULL);
  299.   }//if
  300. }//Ndb::startTransaction()
  301. /*****************************************************************************
  302. NdbConnection* hupp(NdbConnection* pBuddyTrans);
  303. Return Value:   Returns a pointer to a connection object.
  304.                 Connected to the same node as pBuddyTrans
  305.                 and also using the same transction id
  306. Remark:         Start transaction. Synchronous.
  307. *****************************************************************************/ 
  308. NdbConnection* 
  309. Ndb::hupp(NdbConnection* pBuddyTrans)
  310. {
  311.   DBUG_ENTER("Ndb::hupp");
  312.   DBUG_PRINT("enter", ("trans: 0x%x",pBuddyTrans));
  313.   Uint32 aPriority = 0;
  314.   if (pBuddyTrans == NULL){
  315.     DBUG_RETURN(startTransaction());
  316.   }
  317.   if (theInitState == Initialised) {
  318.     theError.code = 0;
  319.     checkFailedNode();
  320.     Uint32 nodeId = pBuddyTrans->getConnectedNodeId();
  321.     NdbConnection* pCon = startTransactionLocal(aPriority, nodeId);
  322.     if(pCon == NULL)
  323.       DBUG_RETURN(NULL);
  324.     if (pCon->getConnectedNodeId() != nodeId){
  325.       // We could not get a connection to the desired node
  326.       // release the connection and return NULL
  327.       closeTransaction(pCon);
  328.       theError.code = 4006;
  329.       DBUG_RETURN(NULL);
  330.     }
  331.     pCon->setTransactionId(pBuddyTrans->getTransactionId());
  332.     pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr());
  333.     DBUG_PRINT("exit", ("hupp trans: 0x%x transid: 0x%llx",
  334. pCon, pCon ? pCon->getTransactionId() : 0));
  335.     DBUG_RETURN(pCon);
  336.   } else {
  337.     DBUG_RETURN(NULL);
  338.   }//if
  339. }//Ndb::hupp()
  340. NdbConnection* 
  341. Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
  342. {
  343. #ifdef VM_TRACE
  344.   char buf[255];
  345.   const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);
  346.   if(val != 0){
  347.     nodeId = atoi(val);
  348.   }
  349. #endif
  350.   DBUG_ENTER("Ndb::startTransactionLocal");
  351.   DBUG_PRINT("enter", ("nodeid: %d", nodeId));
  352.   NdbConnection* tConnection;
  353.   Uint64 tFirstTransId = theFirstTransId;
  354.   tConnection = doConnect(nodeId);
  355.   if (tConnection == NULL) {
  356.     DBUG_RETURN(NULL);
  357.   }//if
  358.   NdbConnection* tConNext = theTransactionList;
  359.   tConnection->init();
  360.   theTransactionList = tConnection;        // into a transaction list.
  361.   tConnection->next(tConNext);   // Add the active connection object
  362.   tConnection->setTransactionId(tFirstTransId);
  363.   tConnection->thePriority = aPriority;
  364.   if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {
  365.     //---------------------------------------------------
  366. // Transaction id rolling round. We will start from
  367. // consecutive identity 0 again.
  368. //---------------------------------------------------
  369.     theFirstTransId = ((tFirstTransId >> 32) << 32);      
  370.   } else {
  371.     theFirstTransId = tFirstTransId + 1;
  372.   }//if
  373. #ifdef VM_TRACE
  374.   if (tConnection->theListState != NdbConnection::NotInList) {
  375.     printState("startTransactionLocal %x", tConnection);
  376.     abort();
  377.   }
  378. #endif
  379.   DBUG_RETURN(tConnection);
  380. }//Ndb::startTransactionLocal()
  381. /*****************************************************************************
  382. void closeTransaction(NdbConnection* aConnection);
  383. Parameters:     aConnection: the connection used in the transaction.
  384. Remark:         Close transaction by releasing the connection and all operations.
  385. *****************************************************************************/
  386. void
  387. Ndb::closeTransaction(NdbConnection* aConnection)
  388. {
  389.   DBUG_ENTER("Ndb::closeTransaction");
  390.   NdbConnection* tCon;
  391.   NdbConnection* tPreviousCon;
  392.   if (aConnection == NULL) {
  393. //-----------------------------------------------------
  394. // closeTransaction called on NULL pointer, destructive
  395. // application behaviour.
  396. //-----------------------------------------------------
  397. #ifdef VM_TRACE
  398.     printf("NULL into closeTransactionn");
  399. #endif
  400.     DBUG_VOID_RETURN;
  401.   }//if
  402.   CHECK_STATUS_MACRO_VOID;
  403.   
  404.   tCon = theTransactionList;
  405.   
  406.   DBUG_PRINT("info",("close trans: 0x%x transid: 0x%llx",
  407.      aConnection, aConnection->getTransactionId()));
  408.   DBUG_PRINT("info",("magic number: 0x%x TCConPtr: 0x%x theMyRef: 0x%x 0x%x",
  409.      aConnection->theMagicNumber, aConnection->theTCConPtr,
  410.      aConnection->theMyRef, getReference()));
  411.   if (aConnection == tCon) { // Remove the active connection object
  412.     theTransactionList = tCon->next(); // from the transaction list.
  413.   } else { 
  414.     while (aConnection != tCon) {
  415.       if (tCon == NULL) {
  416. //-----------------------------------------------------
  417. // closeTransaction called on non-existing transaction
  418. //-----------------------------------------------------
  419. if(aConnection->theError.code == 4008){
  420.   /**
  421.    * When a SCAN timed-out, returning the NdbConnection leads
  422.    * to reuse. And TC crashes when the API tries to reuse it to
  423.    * something else...
  424.    */
  425. #ifdef VM_TRACE
  426.   printf("Scan timeout:ed NdbConnection-> "
  427.  "not returning it-> memory leakn");
  428. #endif
  429.   DBUG_VOID_RETURN;
  430. }
  431. #ifdef VM_TRACE
  432. printf("Non-existing transaction into closeTransactionn");
  433. abort();
  434. #endif
  435. DBUG_VOID_RETURN;
  436.       }//if
  437.       tPreviousCon = tCon;
  438.       tCon = tCon->next();
  439.     }//while
  440.     tPreviousCon->next(tCon->next());
  441.   }//if
  442.   
  443.   aConnection->release();
  444.   
  445.   if(aConnection->theError.code == 4008){
  446.     /**
  447.      * Something timed-out, returning the NdbConnection leads
  448.      * to reuse. And TC crashes when the API tries to reuse it to
  449.      * something else...
  450.      */
  451. #ifdef VM_TRACE
  452.     printf("Con timeout:ed NdbConnection-> not returning it-> memory leakn");
  453. #endif
  454.     DBUG_VOID_RETURN;
  455.   }
  456.   
  457.   if (aConnection->theReleaseOnClose == false) {
  458.     /**
  459.      * Put it back in idle list for that node
  460.      */
  461.     Uint32 nodeId = aConnection->getConnectedNodeId();
  462.     aConnection->theNext = theConnectionArray[nodeId];
  463.     theConnectionArray[nodeId] = aConnection;
  464.     DBUG_VOID_RETURN;
  465.   } else {
  466.     aConnection->theReleaseOnClose = false;
  467.     releaseNdbCon(aConnection);
  468.   }//if
  469.   DBUG_VOID_RETURN;
  470. }//Ndb::closeTransaction()
  471. /*****************************************************************************
  472. int* NdbTamper(int aAction, int aNode);
  473. Parameters: aAction     Specifies what action to be taken
  474.             1: Lock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
  475.             2: UnLock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
  476.     3: Crash node
  477.            aNode        Specifies which node the action will be taken
  478.         -1: Master DIH 
  479.         0-16: Nodnumber
  480. Return Value: -1 Error  .
  481.                 
  482. Remark:         Sends a signal to DIH.
  483. *****************************************************************************/ 
  484. int 
  485. Ndb::NdbTamper(TamperType aAction, int aNode)
  486. {
  487.   NdbConnection* tNdbConn;
  488.   NdbApiSignal tSignal(theMyRef);
  489.   int tNode;
  490.   int                   tAction;
  491.   int ret_code;
  492. #ifdef CUSTOMER_RELEASE
  493.   return -1;
  494. #else
  495.   CHECK_STATUS_MACRO;
  496.   checkFailedNode();
  497.   theRestartGCI = 0;
  498.   switch (aAction) {
  499. // Translate enum to integer. This is done because the SCI layer
  500. // expects integers. 
  501.      case LockGlbChp:
  502.         tAction = 1;
  503.         break;
  504.      case UnlockGlbChp:
  505.         tAction = 2;
  506. break;
  507.      case CrashNode:
  508.         tAction = 3;
  509.         break;
  510.      case ReadRestartGCI:
  511. tAction = 4;
  512. break;
  513.      default:
  514.         theError.code = 4102;
  515.         return -1;
  516.   }
  517.   tNdbConn = getNdbCon(); // Get free connection object
  518.   if (tNdbConn == NULL) {
  519.     theError.code = 4000;
  520.     return -1;
  521.   }
  522.   tSignal.setSignal(GSN_DIHNDBTAMPER);
  523.   tSignal.setData (tAction, 1);
  524.   tSignal.setData(tNdbConn->ptr2int(),2);
  525.   tSignal.setData(theMyRef,3); // Set return block reference
  526.   tNdbConn->Status(NdbConnection::Connecting); // Set status to connecting
  527.   TransporterFacade *tp = TransporterFacade::instance();
  528.   if (tAction == 3) {
  529.     tp->lock_mutex();
  530.     tp->sendSignal(&tSignal, aNode);
  531.     tp->unlock_mutex();
  532.     releaseNdbCon(tNdbConn);
  533.   } else if ( (tAction == 2) || (tAction == 1) ) {
  534.     tp->lock_mutex();
  535.     tNode = tp->get_an_alive_node();
  536.     if (tNode == 0) {
  537.       theError.code = 4002;
  538.       releaseNdbCon(tNdbConn);
  539.       return -1;
  540.     }//if
  541.     ret_code = tp->sendSignal(&tSignal,aNode);
  542.     tp->unlock_mutex();
  543.     releaseNdbCon(tNdbConn);
  544.     return ret_code;
  545.   } else {
  546.     do {
  547.       tp->lock_mutex();
  548.       // Start protected area
  549.       tNode = tp->get_an_alive_node();
  550.       tp->unlock_mutex();
  551.       // End protected area
  552.       if (tNode == 0) {
  553.         theError.code = 4009;
  554.         releaseNdbCon(tNdbConn);
  555.         return -1;
  556.       }//if
  557.       ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
  558.       if (ret_code == 0) {  
  559.         if (tNdbConn->Status() != NdbConnection::Connected) {
  560.           theRestartGCI = 0;
  561.         }//if
  562.         releaseNdbCon(tNdbConn);
  563.         return theRestartGCI;
  564.       } else if ((ret_code == -5) || (ret_code == -2)) {
  565.         TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
  566.       } else {
  567.         return -1;
  568.       }//if
  569.     } while (1);
  570.   }
  571.   return 0;
  572. #endif
  573. }
  574. #if 0
  575. /****************************************************************************
  576. NdbSchemaCon* startSchemaTransaction();
  577. Return Value:   Returns a pointer to a schema connection object.
  578.                 Return NULL otherwise.
  579. Remark:         Start schema transaction. Synchronous.
  580. ****************************************************************************/ 
  581. NdbSchemaCon* 
  582. Ndb::startSchemaTransaction()
  583. {
  584.   NdbSchemaCon* tSchemaCon;
  585.   if (theSchemaConToNdbList != NULL) {
  586.     theError.code = 4321;
  587.     return NULL;
  588.   }//if
  589.   tSchemaCon = new NdbSchemaCon(this);
  590.   if (tSchemaCon == NULL) {
  591.     theError.code = 4000;
  592.     return NULL;
  593.   }//if 
  594.   theSchemaConToNdbList = tSchemaCon;
  595.   return tSchemaCon;  
  596. }
  597. /*****************************************************************************
  598. void closeSchemaTransaction(NdbSchemaCon* aSchemaCon);
  599. Parameters:     aSchemaCon: the schemacon used in the transaction.
  600. Remark:         Close transaction by releasing the schemacon and all schemaop.
  601. *****************************************************************************/
  602. void
  603. Ndb::closeSchemaTransaction(NdbSchemaCon* aSchemaCon)
  604. {
  605.   if (theSchemaConToNdbList != aSchemaCon) {
  606.     abort();
  607.     return;
  608.   }//if
  609.   aSchemaCon->release();
  610.   delete aSchemaCon;
  611.   theSchemaConToNdbList = NULL;
  612.   return;
  613. }//Ndb::closeSchemaTransaction()
  614. #endif
  615. /*****************************************************************************
  616. void RestartGCI(int aRestartGCI);
  617. Remark: Set theRestartGCI on the NDB object
  618. *****************************************************************************/
  619. void
  620. Ndb::RestartGCI(int aRestartGCI)
  621. {
  622.   theRestartGCI = aRestartGCI;
  623. }
  624. /****************************************************************************
  625. int getBlockNumber(void);
  626. Remark:
  627. ****************************************************************************/
  628. int
  629. Ndb::getBlockNumber()
  630. {
  631.   return theNdbBlockNumber;
  632. }
  633. NdbDictionary::Dictionary *
  634. Ndb::getDictionary() const {
  635.   return theDictionary;
  636. }
  637. /****************************************************************************
  638. int getNodeId();
  639. Remark:
  640. ****************************************************************************/
  641. int
  642. Ndb::getNodeId()
  643. {
  644.   return theNode;
  645. }
  646. /****************************************************************************
  647. Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );
  648. Parameters:     aTableId : The TableId.
  649.                 cacheSize: Prefetch this many values
  650. Remark: Returns a new TupleId to the application.
  651.                 The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
  652.                 It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
  653. ****************************************************************************/
  654. #define DEBUG_TRACE(msg) 
  655. //  ndbout << __FILE__ << " line: " << __LINE__ << " msg: " << msg << endl
  656. Uint64
  657. Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
  658. {
  659.   DBUG_ENTER("getAutoIncrementValue");
  660.   const char * internalTableName = internalizeTableName(aTableName);
  661.   Ndb_local_table_info *info=
  662.     theDictionary->get_local_table_info(internalTableName, false);
  663.   if (info == 0)
  664.     DBUG_RETURN(~(Uint64)0);
  665.   const NdbTableImpl *table= info->m_table_impl;
  666.   Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
  667.   DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  668.   DBUG_RETURN(tupleId);
  669. }
  670. Uint64
  671. Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize)
  672. {
  673.   DBUG_ENTER("getAutoIncrementValue");
  674.   if (aTable == 0)
  675.     DBUG_RETURN(~(Uint64)0);
  676.   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  677.   Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
  678.   DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  679.   DBUG_RETURN(tupleId);
  680. }
  681. Uint64 
  682. Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
  683. {
  684.   const NdbTableImpl* table = theDictionary->getTable(aTableName);
  685.   if (table == 0)
  686.     return ~(Uint64)0;
  687.   return getTupleIdFromNdb(table->m_tableId, cacheSize);
  688. }
  689. Uint64
  690. Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
  691. {
  692.   DBUG_ENTER("getTupleIdFromNdb");
  693.   if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
  694.   {
  695.     theFirstTupleId[aTableId]++;
  696.     DBUG_PRINT("info", ("next cached value %ul", 
  697.                         (ulong) theFirstTupleId[aTableId]));
  698.     DBUG_RETURN(theFirstTupleId[aTableId]);
  699.   }
  700.   else // theFirstTupleId == theLastTupleId
  701.   {
  702.     DBUG_PRINT("info",("reading %u values from database", 
  703.                        (cacheSize == 0) ? 1 : cacheSize));
  704.     DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0));
  705.   }
  706. }
  707. Uint64
  708. Ndb::readAutoIncrementValue(const char* aTableName)
  709. {
  710.   DBUG_ENTER("readtAutoIncrementValue");
  711.   const NdbTableImpl* table = theDictionary->getTable(aTableName);
  712.   if (table == 0) {
  713.     theError= theDictionary->getNdbError();
  714.     DBUG_RETURN(~(Uint64)0);
  715.   }
  716.   Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
  717.   DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  718.   DBUG_RETURN(tupleId);
  719. }
  720. Uint64
  721. Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable)
  722. {
  723.   DBUG_ENTER("readtAutoIncrementValue");
  724.   if (aTable == 0)
  725.     DBUG_RETURN(~(Uint64)0);
  726.   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  727.   Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
  728.   DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  729.   DBUG_RETURN(tupleId);
  730. }
  731. Uint64
  732. Ndb::readTupleIdFromNdb(Uint32 aTableId)
  733. {
  734.   if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
  735.     // Cache is empty, check next in database
  736.     return opTupleIdOnNdb(aTableId, 0, 3);
  737.   return theFirstTupleId[aTableId] + 1;
  738. }
  739. bool
  740. Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
  741. {
  742.   DEBUG_TRACE("setAutoIncrementValue " << val);
  743.   const char * internalTableName= internalizeTableName(aTableName);
  744.   Ndb_local_table_info *info=
  745.     theDictionary->get_local_table_info(internalTableName, false);
  746.   if (info == 0) {
  747.     theError= theDictionary->getNdbError();
  748.     return false;
  749.   }
  750.   const NdbTableImpl* table= info->m_table_impl;
  751.   return setTupleIdInNdb(table->m_tableId, val, increase);
  752. }
  753. bool
  754. Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, bool increase)
  755. {
  756.   DEBUG_TRACE("setAutoIncrementValue " << val);
  757.   if (aTable == 0)
  758.     return ~(Uint64)0;
  759.   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  760.   return setTupleIdInNdb(table->m_tableId, val, increase);
  761. }
  762. bool 
  763. Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
  764. {
  765.   DEBUG_TRACE("setTupleIdInNdb");
  766.   const NdbTableImpl* table = theDictionary->getTable(aTableName);
  767.   if (table == 0) {
  768.     theError= theDictionary->getNdbError();
  769.     return false;
  770.   }
  771.   return setTupleIdInNdb(table->m_tableId, val, increase);
  772. }
  773. bool
  774. Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
  775. {
  776.   DEBUG_TRACE("setTupleIdInNdb");
  777.   if (increase)
  778.   {
  779.     if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
  780.     {
  781.       // We have a cache sequence
  782.       if (val <= theFirstTupleId[aTableId]+1)
  783. return false;
  784.       if (val <= theLastTupleId[aTableId])
  785.       {
  786. theFirstTupleId[aTableId] = val - 1;
  787. return true;
  788.       }
  789.       // else continue;
  790.     }    
  791.     return (opTupleIdOnNdb(aTableId, val, 2) == val);
  792.   }
  793.   else
  794.     return (opTupleIdOnNdb(aTableId, val, 1) == val);
  795. }
  796. Uint64
  797. Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
  798. {
  799.   DEBUG_TRACE("opTupleIdOnNdb");
  800.   NdbConnection*     tConnection;
  801.   NdbOperation*      tOperation;
  802.   Uint64             tValue;
  803.   NdbRecAttr*        tRecAttrResult;
  804.   int                result;
  805.   Uint64 ret;
  806.   CHECK_STATUS_MACRO_ZERO;
  807.   BaseString currentDb(getDatabaseName());
  808.   BaseString currentSchema(getDatabaseSchemaName());
  809.   setDatabaseName("sys");
  810.   setDatabaseSchemaName("def");
  811.   tConnection = this->startTransaction();
  812.   if (tConnection == NULL)
  813.     goto error_return;
  814.   if (usingFullyQualifiedNames())
  815.     tOperation = tConnection->getNdbOperation("SYSTAB_0");
  816.   else
  817.     tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
  818.   if (tOperation == NULL)
  819.     goto error_handler;
  820.   switch (op)
  821.     {
  822.     case 0:
  823.       tOperation->interpretedUpdateTuple();
  824.       tOperation->equal("SYSKEY_0", aTableId );
  825.       tOperation->incValue("NEXTID", opValue);
  826.       tRecAttrResult = tOperation->getValue("NEXTID");
  827.       if (tConnection->execute( Commit ) == -1 )
  828.         goto error_handler;
  829.       tValue = tRecAttrResult->u_64_value();
  830.       theFirstTupleId[aTableId] = tValue - opValue;
  831.       theLastTupleId[aTableId]  = tValue - 1;
  832.       ret = theFirstTupleId[aTableId];
  833.       break;
  834.     case 1:
  835.       tOperation->updateTuple();
  836.       tOperation->equal("SYSKEY_0", aTableId );
  837.       tOperation->setValue("NEXTID", opValue);
  838.       if (tConnection->execute( Commit ) == -1 )
  839.         goto error_handler;
  840.       theFirstTupleId[aTableId] = ~(Uint64)0;
  841.       theLastTupleId[aTableId]  = ~(Uint64)0;
  842.       ret = opValue;
  843.       break;
  844.     case 2:
  845.       tOperation->interpretedUpdateTuple();
  846.       tOperation->equal("SYSKEY_0", aTableId );
  847.       tOperation->load_const_u64(1, opValue);
  848.       tOperation->read_attr("NEXTID", 2);
  849.       tOperation->branch_le(2, 1, 0);
  850.       tOperation->write_attr("NEXTID", 1);
  851.       tOperation->interpret_exit_ok();
  852.       tOperation->def_label(0);
  853.       tOperation->interpret_exit_nok(9999);
  854.       
  855.       if ( (result = tConnection->execute( Commit )) == -1 )
  856.         goto error_handler;
  857.       
  858.       if (result == 9999)
  859.         ret = ~(Uint64)0;
  860.       else
  861.       {
  862.         theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
  863. ret = opValue;
  864.       }
  865.       break;
  866.     case 3:
  867.       tOperation->readTuple();
  868.       tOperation->equal("SYSKEY_0", aTableId );
  869.       tRecAttrResult = tOperation->getValue("NEXTID");
  870.       if (tConnection->execute( Commit ) == -1 )
  871.         goto error_handler;
  872.       ret = tRecAttrResult->u_64_value();
  873.       break;
  874.     default:
  875.       goto error_handler;
  876.     }
  877.   this->closeTransaction(tConnection);
  878.   // Restore current name space
  879.   setDatabaseName(currentDb.c_str());
  880.   setDatabaseSchemaName(currentSchema.c_str());
  881.   return ret;
  882.   error_handler:
  883.     theError.code = tConnection->theError.code;
  884.     this->closeTransaction(tConnection);
  885.   error_return:
  886.     // Restore current name space
  887.     setDatabaseName(currentDb.c_str());
  888.     setDatabaseSchemaName(currentSchema.c_str());
  889.   return ~(Uint64)0;
  890. }
  891. Uint32
  892. convertEndian(Uint32 Data)
  893. {
  894. #ifdef WORDS_BIGENDIAN
  895.   Uint32 t1, t2, t3, t4;
  896.   t4 = (Data >> 24) & 255;
  897.   t3 = (Data >> 16) & 255;
  898.   t4 = t4 + (t3 << 8);
  899.   t2 = (Data >> 8) & 255;
  900.   t4 = t4 + (t2 << 16);
  901.   t1 = Data & 255;
  902.   t4 = t4 + (t1 << 24);
  903.   return t4;
  904. #else
  905.   return Data;
  906. #endif
  907. }
  908. const char * Ndb::getCatalogName() const
  909. {
  910.   return theDataBase;
  911. }
  912.  
  913. void Ndb::setCatalogName(const char * a_catalog_name)
  914. {
  915.   if (a_catalog_name) {
  916.     BaseString::snprintf(theDataBase, sizeof(theDataBase), "%s",
  917.              a_catalog_name ? a_catalog_name : "");
  918.     
  919.     int len = BaseString::snprintf(prefixName, sizeof(prefixName), "%s%c%s%c",
  920.                        theDataBase, table_name_separator,
  921.                        theDataBaseSchema, table_name_separator);
  922.     prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : 
  923.                               sizeof(prefixName) - 1);
  924.   }
  925. }
  926.  
  927. const char * Ndb::getSchemaName() const
  928. {
  929.   return theDataBaseSchema;
  930. }
  931.  
  932. void Ndb::setSchemaName(const char * a_schema_name)
  933. {
  934.   if (a_schema_name) {
  935.     BaseString::snprintf(theDataBaseSchema, sizeof(theDataBase), "%s",
  936.              a_schema_name ? a_schema_name : "");
  937.     int len = BaseString::snprintf(prefixName, sizeof(prefixName), "%s%c%s%c",
  938.                        theDataBase, table_name_separator,
  939.                        theDataBaseSchema, table_name_separator);
  940.     prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : 
  941.                               sizeof(prefixName) - 1);
  942.   }
  943. }
  944.  
  945. /*
  946. Deprecated functions
  947. */
  948. const char * Ndb::getDatabaseName() const
  949. {
  950.   return getCatalogName();
  951. }
  952.  
  953. void Ndb::setDatabaseName(const char * a_catalog_name)
  954. {
  955.   setCatalogName(a_catalog_name);
  956. }
  957.  
  958. const char * Ndb::getDatabaseSchemaName() const
  959. {
  960.   return getSchemaName();
  961. }
  962.  
  963. void Ndb::setDatabaseSchemaName(const char * a_schema_name)
  964. {
  965.   setSchemaName(a_schema_name);
  966. }
  967.  
  968. bool Ndb::usingFullyQualifiedNames()
  969. {
  970.   return fullyQualifiedNames;
  971. }
  972.  
  973. const char *
  974. Ndb::externalizeTableName(const char * internalTableName, bool fullyQualifiedNames)
  975. {
  976.   if (fullyQualifiedNames) {
  977.     register const char *ptr = internalTableName;
  978.    
  979.     // Skip database name
  980.     while (*ptr && *ptr++ != table_name_separator);
  981.     // Skip schema name
  982.     while (*ptr && *ptr++ != table_name_separator);
  983.     return ptr;
  984.   }
  985.   else
  986.     return internalTableName;
  987. }
  988. const char *
  989. Ndb::externalizeTableName(const char * internalTableName)
  990. {
  991.   return externalizeTableName(internalTableName, usingFullyQualifiedNames());
  992. }
  993. const char *
  994. Ndb::externalizeIndexName(const char * internalIndexName, bool fullyQualifiedNames)
  995. {
  996.   if (fullyQualifiedNames) {
  997.     register const char *ptr = internalIndexName;
  998.    
  999.     // Scan name from the end
  1000.     while (*ptr++); ptr--; // strend
  1001.     while (ptr >= internalIndexName && *ptr != table_name_separator)
  1002.       ptr--;
  1003.      
  1004.     return ptr + 1;
  1005.   }
  1006.   else
  1007.     return internalIndexName;
  1008. }
  1009. const char *
  1010. Ndb::externalizeIndexName(const char * internalIndexName)
  1011. {
  1012.   return externalizeIndexName(internalIndexName, usingFullyQualifiedNames());
  1013. }
  1014. const char *
  1015. Ndb::internalizeTableName(const char * externalTableName)
  1016. {
  1017.   if (fullyQualifiedNames) {
  1018.     strncpy(prefixEnd, externalTableName, NDB_MAX_TAB_NAME_SIZE);
  1019.     return prefixName;
  1020.   }
  1021.   else
  1022.     return externalTableName;
  1023. }
  1024.  
  1025. const char *
  1026. Ndb::internalizeIndexName(const NdbTableImpl * table,
  1027.                           const char * externalIndexName)
  1028. {
  1029.   if (fullyQualifiedNames) {
  1030.     char tableId[10];
  1031.     sprintf(tableId, "%d", table->m_tableId);
  1032.     Uint32 tabIdLen = strlen(tableId);
  1033.     strncpy(prefixEnd, tableId, tabIdLen);
  1034.     prefixEnd[tabIdLen] = table_name_separator;
  1035.     strncpy(prefixEnd + tabIdLen + 1, 
  1036.     externalIndexName, NDB_MAX_TAB_NAME_SIZE);
  1037.     return prefixName;
  1038.   }
  1039.   else
  1040.     return externalIndexName;
  1041. }
  1042. const BaseString
  1043. Ndb::getDatabaseFromInternalName(const char * internalName)
  1044. {
  1045.   char * databaseName = new char[strlen(internalName) + 1];
  1046.   strcpy(databaseName, internalName);
  1047.   register char *ptr = databaseName;
  1048.    
  1049.   /* Scan name for the first table_name_separator */
  1050.   while (*ptr && *ptr != table_name_separator)
  1051.     ptr++;
  1052.   *ptr = '';
  1053.   BaseString ret = BaseString(databaseName);
  1054.   delete [] databaseName;
  1055.   return ret;
  1056. }
  1057.  
  1058. const BaseString
  1059. Ndb::getSchemaFromInternalName(const char * internalName)
  1060. {
  1061.   char * schemaName = new char[strlen(internalName)];
  1062.   register const char *ptr1 = internalName;
  1063.    
  1064.   /* Scan name for the second table_name_separator */
  1065.   while (*ptr1 && *ptr1 != table_name_separator)
  1066.     ptr1++;
  1067.   strcpy(schemaName, ptr1 + 1);
  1068.   register char *ptr = schemaName;
  1069.   while (*ptr && *ptr != table_name_separator)
  1070.     ptr++;
  1071.   *ptr = '';
  1072.   BaseString ret = BaseString(schemaName);
  1073.   delete [] schemaName;
  1074.   return ret;
  1075. }
  1076. NdbEventOperation* Ndb::createEventOperation(const char* eventName,
  1077.      const int bufferLength)
  1078. {
  1079.   NdbEventOperation* tOp;
  1080.   tOp = new NdbEventOperation(this, eventName, bufferLength);
  1081.   if (tOp->getState() != NdbEventOperation::CREATED) {
  1082.     delete tOp;
  1083.     tOp = NULL;
  1084.   }
  1085.   //now we have to look up this event in dict
  1086.   return tOp;
  1087. }
  1088. int Ndb::dropEventOperation(NdbEventOperation* op) {
  1089.   delete op;
  1090.   return 0;
  1091. }
  1092. NdbGlobalEventBufferHandle* Ndb::getGlobalEventBufferHandle()
  1093. {
  1094.   return theGlobalEventBufferHandle;
  1095. }
  1096. //void Ndb::monitorEvent(NdbEventOperation *op, NdbEventCallback cb, void* rs)
  1097. //{
  1098. //}
  1099. int
  1100. Ndb::pollEvents(int aMillisecondNumber)
  1101. {
  1102.   return NdbEventOperation::wait(theGlobalEventBufferHandle,
  1103.  aMillisecondNumber);
  1104. }
  1105. #ifdef VM_TRACE
  1106. #include <NdbMutex.h>
  1107. extern NdbMutex *ndb_print_state_mutex;
  1108. static bool
  1109. checkdups(NdbConnection** list, unsigned no)
  1110. {
  1111.   for (unsigned i = 0; i < no; i++)
  1112.     for (unsigned j = i + 1; j < no; j++)
  1113.       if (list[i] == list[j])
  1114.         return true;
  1115.   return false;
  1116. }
  1117. void
  1118. Ndb::printState(const char* fmt, ...)
  1119. {
  1120.   char buf[200];
  1121.   va_list ap;
  1122.   va_start(ap, fmt);
  1123.   vsprintf(buf, fmt, ap);
  1124.   va_end(ap);
  1125.   NdbMutex_Lock(ndb_print_state_mutex);
  1126.   bool dups = false;
  1127.   unsigned i;
  1128.   ndbout << buf << " ndb=" << hex << this << dec;
  1129. #ifndef NDB_WIN32
  1130.   ndbout << " thread=" << (int)pthread_self();
  1131. #endif
  1132.   ndbout << endl;
  1133.   for (unsigned n = 0; n < MAX_NDB_NODES; n++) {
  1134.     NdbConnection* con = theConnectionArray[n];
  1135.     if (con != 0) {
  1136.       ndbout << "conn " << n << ":" << endl;
  1137.       while (con != 0) {
  1138.         con->printState();
  1139.         con = con->theNext;
  1140.       }
  1141.     }
  1142.   }
  1143.   ndbout << "prepared: " << theNoOfPreparedTransactions<< endl;
  1144.   if (checkdups(thePreparedTransactionsArray, theNoOfPreparedTransactions)) {
  1145.     ndbout << "!! DUPS !!" << endl;
  1146.     dups = true;
  1147.   }
  1148.   for (i = 0; i < theNoOfPreparedTransactions; i++)
  1149.     thePreparedTransactionsArray[i]->printState();
  1150.   ndbout << "sent: " << theNoOfSentTransactions<< endl;
  1151.   if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) {
  1152.     ndbout << "!! DUPS !!" << endl;
  1153.     dups = true;
  1154.   }
  1155.   for (i = 0; i < theNoOfSentTransactions; i++)
  1156.     theSentTransactionsArray[i]->printState();
  1157.   ndbout << "completed: " << theNoOfCompletedTransactions<< endl;
  1158.   if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) {
  1159.     ndbout << "!! DUPS !!" << endl;
  1160.     dups = true;
  1161.   }
  1162.   for (i = 0; i < theNoOfCompletedTransactions; i++)
  1163.     theCompletedTransactionsArray[i]->printState();
  1164.   NdbMutex_Unlock(ndb_print_state_mutex);
  1165. }
  1166. #endif