ndb_async2.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:22k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. //#define DEBUG_ON
  14. #include <string.h>
  15. #include "userInterface.h"
  16. #include "macros.h"
  17. #include "ndb_schema.hpp"
  18. #include "ndb_error.hpp"
  19. #include <NdbSleep.h>
  20. #include <NdbApi.hpp>
  21. void T1_Callback(int result, NdbConnection * pCon, void * threadData);
  22. void T2_Callback(int result, NdbConnection * pCon, void * threadData);
  23. void T3_Callback_1(int result, NdbConnection * pCon, void * threadData);
  24. void T3_Callback_2(int result, NdbConnection * pCon, void * threadData);
  25. void T3_Callback_3(int result, NdbConnection * pCon, void * threadData);
  26. void T4_Callback_1(int result, NdbConnection * pCon, void * threadData);
  27. void T4_Callback_2(int result, NdbConnection * pCon, void * threadData);
  28. void T4_Callback_3(int result, NdbConnection * pCon, void * threadData);
  29. void T5_Callback_1(int result, NdbConnection * pCon, void * threadData);
  30. void T5_Callback_2(int result, NdbConnection * pCon, void * threadData);
  31. void T5_Callback_3(int result, NdbConnection * pCon, void * threadData);
  32. static int stat_async = 0;
  33. /**
  34.  * Transaction 1 - T1 
  35.  *
  36.  * Update location and changed by/time on a subscriber
  37.  *
  38.  * Input: 
  39.  *   SubscriberNumber,
  40.  *   Location,
  41.  *   ChangedBy,
  42.  *   ChangedTime
  43.  *
  44.  * Output:
  45.  */
  46. #define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
  47. inline
  48. NdbConnection *
  49. startTransaction(Ndb * pNDB, ThreadData * td){
  50.   return pNDB->startTransaction();
  51. #ifdef OLD_CODE
  52.   return pNDB->startTransactionDGroup (0, 
  53.        &td->transactionData.number[SFX_START],
  54.        1);
  55. #endif
  56. }
  57. void
  58. start_T1(Ndb * pNDB, ThreadData * td, int async){
  59.   DEBUG2("T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  60.  td->transactionData.number); 
  61.   NdbConnection * pCON = 0;
  62.   while((pCON = startTransaction(pNDB, td)) == 0){
  63.     CHECK_ALLOWED_ERROR("T1: startTransaction", td, pNDB->getNdbError());
  64.     NdbSleep_MilliSleep(10);
  65.   }
  66.   NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
  67.   if (MyOp != NULL) {  
  68.     MyOp->updateTuple();  
  69.     MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  70. td->transactionData.number);
  71.     MyOp->setValue(IND_SUBSCRIBER_LOCATION, 
  72.    (char *)&td->transactionData.location);
  73.     MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY, 
  74.    td->transactionData.changed_by);
  75.     MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME, 
  76.    td->transactionData.changed_time);
  77.     if (async == 1) {
  78.       pCON->executeAsynchPrepare( Commit , T1_Callback, td);
  79.     } else {
  80.       int result = pCON->execute(Commit);
  81.       T1_Callback(result, pCON, (void*)td);
  82.       return;
  83.     }//if
  84.   } else {
  85.     CHECK_NULL(MyOp, "T1: getNdbOperation", td, pCON->getNdbError());
  86.   }//if
  87. }
  88. void
  89. T1_Callback(int result, NdbConnection * pCON, void * threadData) {
  90.   ThreadData * td = (ThreadData *)threadData;
  91.   
  92.   DEBUG2("T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
  93.  td->transactionData.number); 
  94.   if (result == -1) {
  95.     CHECK_ALLOWED_ERROR("T1: Commit", td, pCON->getNdbError());
  96.     td->pNDB->closeTransaction(pCON);
  97.     start_T1(td->pNDB, td, stat_async);
  98.     return;
  99.   }//if
  100.   td->pNDB->closeTransaction(pCON);
  101.   complete_T1(td);
  102. }
  103. /**
  104.  * Transaction 2 - T2
  105.  *
  106.  * Read from Subscriber:
  107.  *
  108.  * Input: 
  109.  *   SubscriberNumber
  110.  *
  111.  * Output:
  112.  *   Location
  113.  *   Changed by
  114.  *   Changed Timestamp
  115.  *   Name
  116.  */
  117. void
  118. start_T2(Ndb * pNDB, ThreadData * td, int async){
  119.   DEBUG3("T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  120.  td->transactionData.number, 
  121.  td->transactionData.location);
  122.   
  123.   NdbConnection * pCON = 0;
  124.   
  125.   while((pCON = startTransaction(pNDB, td)) == 0){
  126.     CHECK_ALLOWED_ERROR("T2-1: startTransaction", td, pNDB->getNdbError());
  127.     NdbSleep_MilliSleep(10);
  128.   }
  129.   NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  130.   CHECK_NULL(MyOp, "T2: getNdbOperation", td,
  131.      pCON->getNdbError());
  132.   
  133.   MyOp->readTuple();
  134.   MyOp->equal(IND_SUBSCRIBER_NUMBER,
  135.       td->transactionData.number);
  136.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  137.  (char *)&td->transactionData.location);
  138.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  139.  td->transactionData.changed_by);
  140.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  141.  td->transactionData.changed_time);
  142.   MyOp->getValue(IND_SUBSCRIBER_NAME, 
  143.  td->transactionData.name);
  144.   if (async == 1) {
  145.     pCON->executeAsynchPrepare( Commit , T2_Callback, td);
  146.   } else {
  147.     int result = pCON->execute(Commit);
  148.     T2_Callback(result, pCON, (void*)td);
  149.     return;
  150.   }//if
  151. }
  152. void
  153. T2_Callback(int result, NdbConnection * pCON, void * threadData){
  154.   ThreadData * td = (ThreadData *)threadData;
  155.   DEBUG3("T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
  156.  td->transactionData.number, 
  157.  td->transactionData.location);
  158.   
  159.   if (result == -1) {
  160.     CHECK_ALLOWED_ERROR("T2: Commit", td, pCON->getNdbError());
  161.     td->pNDB->closeTransaction(pCON);
  162.     start_T2(td->pNDB, td, stat_async);
  163.     return;
  164.   }//if
  165.   td->pNDB->closeTransaction(pCON);
  166.   complete_T2(td);
  167. }
  168. /**
  169.  * Transaction 3 - T3
  170.  *
  171.  * Read session details
  172.  *
  173.  * Input:
  174.  *   SubscriberNumber
  175.  *   ServerId
  176.  *   ServerBit
  177.  *
  178.  * Output:
  179.  *   BranchExecuted
  180.  *   SessionDetails
  181.  *   ChangedBy
  182.  *   ChangedTime
  183.  *   Location
  184.  */
  185. void
  186. start_T3(Ndb * pNDB, ThreadData * td, int async){
  187.   DEBUG3("T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  188.  td->transactionData.number, 
  189.  td->transactionData.server_id);
  190.   
  191.   NdbConnection * pCON = 0;
  192.   while((pCON = startTransaction(pNDB, td)) == 0){
  193.     CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
  194.     NdbSleep_MilliSleep(10);
  195.   }
  196.   
  197.   NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  198.   CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
  199.      pCON->getNdbError());
  200.   
  201.   MyOp->readTuple();
  202.   MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  203.       td->transactionData.number);
  204.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  205.  (char *)&td->transactionData.location);
  206.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  207.  td->transactionData.changed_by);
  208.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  209.  td->transactionData.changed_time);
  210.   MyOp->getValue(IND_SUBSCRIBER_GROUP, 
  211.  (char *)&td->transactionData.group_id);
  212.   MyOp->getValue(IND_SUBSCRIBER_SESSIONS, 
  213.  (char *)&td->transactionData.sessions);
  214.   stat_async = async;
  215.   if (async == 1) {
  216.     pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
  217.   } else {
  218.     int result = pCON->execute( NoCommit );
  219.     T3_Callback_1(result, pCON, (void*)td);
  220.     return;
  221.   }//if
  222. }
  223. void
  224. T3_Callback_1(int result, NdbConnection * pCON, void * threadData){
  225.   ThreadData * td = (ThreadData *)threadData;
  226.   DEBUG3("T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH, 
  227.  td->transactionData.number, 
  228.  td->transactionData.server_id);
  229.   if (result == -1) {
  230.     CHECK_ALLOWED_ERROR("T3-1: execute", td, pCON->getNdbError());
  231.     td->pNDB->closeTransaction(pCON);
  232.     start_T3(td->pNDB, td, stat_async);
  233.     return;
  234.   }//if
  235.   NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
  236.   CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
  237.      pCON->getNdbError());
  238.     
  239.   MyOp->readTuple();
  240.   MyOp->equal(IND_GROUP_ID,
  241.       (char*)&td->transactionData.group_id);
  242.   MyOp->getValue(IND_GROUP_ALLOW_READ, 
  243.  (char *)&td->transactionData.permission);
  244.   if (stat_async == 1) {
  245.     pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
  246.   } else {
  247.     int result = pCON->execute( NoCommit );
  248.     T3_Callback_2(result, pCON, (void*)td);
  249.     return;
  250.   }//if
  251. }
  252. void
  253. T3_Callback_2(int result, NdbConnection * pCON, void * threadData){
  254.   ThreadData * td = (ThreadData *)threadData;
  255.   
  256.   if (result == -1) {
  257.     CHECK_ALLOWED_ERROR("T3-2: execute", td, pCON->getNdbError());
  258.     td->pNDB->closeTransaction(pCON);
  259.     start_T3(td->pNDB, td, stat_async);
  260.     return;
  261.   }//if
  262.   
  263.   Uint32 permission = td->transactionData.permission;
  264.   Uint32 sessions   = td->transactionData.sessions;
  265.   Uint32 server_bit = td->transactionData.server_bit;
  266.   if(((permission & server_bit) == server_bit) &&
  267.      ((sessions   & server_bit) == server_bit)){
  268.     
  269.     memcpy(td->transactionData.suffix,
  270.    &td->transactionData.number[SFX_START],
  271.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
  272.     DEBUG5("T3(%.*s, %.2d): - Callback 2 - reading(%.*s)", 
  273.    SUBSCRIBER_NUMBER_LENGTH, 
  274.    td->transactionData.number, 
  275.    td->transactionData.server_id,
  276.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
  277.    td->transactionData.suffix);
  278.     
  279.     /* Operation 3 */
  280.     NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
  281.     CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
  282.        pCON->getNdbError());
  283.     
  284.     MyOp->simpleRead();
  285.     MyOp->equal(IND_SESSION_SUBSCRIBER,
  286. (char*)td->transactionData.number);
  287.     MyOp->equal(IND_SESSION_SERVER,
  288. (char*)&td->transactionData.server_id);
  289.     MyOp->getValue(IND_SESSION_DATA, 
  290.    (char *)td->transactionData.session_details);
  291.     
  292.     /* Operation 4 */
  293.     MyOp = pCON->getNdbOperation(SERVER_TABLE);
  294.     CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
  295.        pCON->getNdbError());
  296.     
  297.     MyOp->interpretedUpdateTuple();
  298.     MyOp->equal(IND_SERVER_ID,
  299. (char*)&td->transactionData.server_id);
  300.     MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
  301. (char*)td->transactionData.suffix);
  302.     MyOp->incValue(IND_SERVER_READS, (uint32)1);
  303.     td->transactionData.branchExecuted = 1;
  304.   } else {
  305.     DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
  306.    SUBSCRIBER_NUMBER_LENGTH, 
  307.    td->transactionData.number, 
  308.    td->transactionData.server_id);
  309.     td->transactionData.branchExecuted = 0;
  310.   }
  311.   if (stat_async == 1) {
  312.     pCON->executeAsynchPrepare( Commit , T3_Callback_3, td);
  313.   } else {
  314.     int result = pCON->execute( Commit );
  315.     T3_Callback_3(result, pCON, (void*)td);
  316.     return;
  317.   }//if
  318. }
  319. void
  320. T3_Callback_3(int result, NdbConnection * pCON, void * threadData){
  321.   ThreadData * td = (ThreadData *)threadData;  
  322.   DEBUG3("T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
  323.  td->transactionData.number, 
  324.  td->transactionData.server_id);
  325.   
  326.   if (result == -1) {
  327.     CHECK_ALLOWED_ERROR("T3-3: Commit", td, pCON->getNdbError());
  328.     td->pNDB->closeTransaction(pCON);
  329.     start_T3(td->pNDB, td, stat_async);
  330.     return;
  331.   }//if
  332.   td->pNDB->closeTransaction(pCON);
  333.   complete_T3(td);
  334. }
  335. /**
  336.  * Transaction 4 - T4
  337.  * 
  338.  * Create session
  339.  *
  340.  * Input:
  341.  *   SubscriberNumber
  342.  *   ServerId
  343.  *   ServerBit
  344.  *   SessionDetails,
  345.  *   DoRollback
  346.  * Output:
  347.  *   ChangedBy
  348.  *   ChangedTime
  349.  *   Location
  350.  *   BranchExecuted
  351.  */
  352. void
  353. start_T4(Ndb * pNDB, ThreadData * td, int async){
  354.   DEBUG3("T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  355.  td->transactionData.number, 
  356.  td->transactionData.server_id);
  357.   
  358.   NdbConnection * pCON = 0;
  359.   while((pCON = startTransaction(pNDB, td)) == 0){
  360.     CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
  361.     NdbSleep_MilliSleep(10);
  362.   }
  363.   
  364.   NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  365.   CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
  366.      pCON->getNdbError());
  367.   
  368.   MyOp->interpretedUpdateTuple();
  369.   MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  370.       td->transactionData.number);
  371.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  372.  (char *)&td->transactionData.location);
  373.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  374.  td->transactionData.changed_by);
  375.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  376.  td->transactionData.changed_time);
  377.   MyOp->getValue(IND_SUBSCRIBER_GROUP,
  378.  (char *)&td->transactionData.group_id);
  379.   MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
  380.  (char *)&td->transactionData.sessions); 
  381.   MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
  382.  (uint32)td->transactionData.server_bit);
  383.   stat_async = async;
  384.   if (async == 1) {
  385.     pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
  386.   } else {
  387.     int result = pCON->execute( NoCommit );
  388.     T4_Callback_1(result, pCON, (void*)td);
  389.     return;
  390.   }//if
  391. }
  392. void
  393. T4_Callback_1(int result, NdbConnection * pCON, void * threadData){
  394.   ThreadData * td = (ThreadData *)threadData;  
  395.   if (result == -1) {
  396.     CHECK_ALLOWED_ERROR("T4-1: execute", td, pCON->getNdbError());
  397.     td->pNDB->closeTransaction(pCON);
  398.     start_T4(td->pNDB, td, stat_async);
  399.     return;
  400.   }//if
  401.   
  402.   DEBUG3("T4(%.*s, %.2d): - Callback 1", 
  403.  SUBSCRIBER_NUMBER_LENGTH, 
  404.  td->transactionData.number, 
  405.  td->transactionData.server_id);
  406.   NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
  407.   CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
  408.      pCON->getNdbError());
  409.   
  410.   MyOp->readTuple();
  411.   MyOp->equal(IND_GROUP_ID,
  412.       (char*)&td->transactionData.group_id);
  413.   MyOp->getValue(IND_GROUP_ALLOW_INSERT, 
  414.  (char *)&td->transactionData.permission);
  415.   if (stat_async == 1) {
  416.     pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
  417.   } else {
  418.     int result = pCON->execute( NoCommit );
  419.     T4_Callback_2(result, pCON, (void*)td);
  420.     return;
  421.   }//if
  422. }
  423. void
  424. T4_Callback_2(int result, NdbConnection * pCON, void * threadData){
  425.   ThreadData * td = (ThreadData *)threadData;  
  426.   if (result == -1) {
  427.     CHECK_ALLOWED_ERROR("T4-2: execute", td, pCON->getNdbError());
  428.     td->pNDB->closeTransaction(pCON);
  429.     start_T4(td->pNDB, td, stat_async);
  430.     return;
  431.   }//if
  432.   Uint32 permission = td->transactionData.permission;
  433.   Uint32 sessions   = td->transactionData.sessions;
  434.   Uint32 server_bit = td->transactionData.server_bit;
  435.   
  436.   if(((permission & server_bit) == server_bit) &&
  437.      ((sessions   & server_bit) == 0)){
  438.     
  439.     memcpy(td->transactionData.suffix,
  440.    &td->transactionData.number[SFX_START],
  441.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
  442.     
  443.     DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)", 
  444.    SUBSCRIBER_NUMBER_LENGTH, 
  445.    td->transactionData.number, 
  446.    td->transactionData.server_id,
  447.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
  448.    td->transactionData.suffix);
  449.     
  450.     /* Operation 3 */
  451.     
  452.     NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
  453.     CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
  454.        pCON->getNdbError());
  455.     
  456.     MyOp->insertTuple();
  457.     MyOp->equal(IND_SESSION_SUBSCRIBER,
  458. (char*)td->transactionData.number);
  459.     MyOp->equal(IND_SESSION_SERVER,
  460. (char*)&td->transactionData.server_id);
  461.     MyOp->setValue(SESSION_DATA, 
  462.    (char *)td->transactionData.session_details);
  463.     /* Operation 4 */
  464.     
  465.     /* Operation 5 */
  466.     MyOp = pCON->getNdbOperation(SERVER_TABLE);
  467.     CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
  468.        pCON->getNdbError());
  469.     
  470.     MyOp->interpretedUpdateTuple();
  471.     MyOp->equal(IND_SERVER_ID,
  472. (char*)&td->transactionData.server_id);
  473.     MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
  474. (char*)td->transactionData.suffix);
  475.     MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
  476.     td->transactionData.branchExecuted = 1;
  477.   } else {
  478.     td->transactionData.branchExecuted = 0;
  479.     DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s",
  480.    SUBSCRIBER_NUMBER_LENGTH, 
  481.    td->transactionData.number, 
  482.    td->transactionData.server_id,
  483.    ((permission & server_bit) ? 
  484.     "permission - " : "no permission - "),
  485.    ((sessions   & server_bit) ? 
  486.     "in session - " : "no in session - "));
  487.   }
  488.   
  489.   if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
  490.     if (stat_async == 1) {
  491.       pCON->executeAsynchPrepare( Commit , T4_Callback_3, td);
  492.     } else {
  493.       int result = pCON->execute( Commit );
  494.       T4_Callback_3(result, pCON, (void*)td);
  495.       return;
  496.     }//if
  497.   } else {
  498.     if (stat_async == 1) {
  499.       pCON->executeAsynchPrepare( Rollback , T4_Callback_3, td);
  500.     } else {
  501.       int result = pCON->execute( Rollback );
  502.       T4_Callback_3(result, pCON, (void*)td);
  503.       return;
  504.     }//if
  505.   }
  506. }
  507. void
  508. T4_Callback_3(int result, NdbConnection * pCON, void * threadData){
  509.   ThreadData * td = (ThreadData *)threadData;  
  510.   if (result == -1) {
  511.     CHECK_ALLOWED_ERROR("T4-3: Commit", td, pCON->getNdbError());
  512.     td->pNDB->closeTransaction(pCON);
  513.     start_T4(td->pNDB, td, stat_async);
  514.     return;
  515.   }//if
  516.   
  517.   DEBUG3("T4(%.*s, %.2d): - Completing", 
  518.  SUBSCRIBER_NUMBER_LENGTH, 
  519.  td->transactionData.number, 
  520.  td->transactionData.server_id);
  521.   td->pNDB->closeTransaction(pCON);
  522.   complete_T4(td);
  523. }
  524. /**
  525.  * Transaction 5 - T5
  526.  * 
  527.  * Delete session
  528.  *
  529.  * Input:
  530.  *   SubscriberNumber
  531.  *   ServerId
  532.  *   ServerBit
  533.  *   DoRollback
  534.  * Output:
  535.  *   ChangedBy
  536.  *   ChangedTime
  537.  *   Location
  538.  *   BranchExecuted
  539.  */
  540. void
  541. start_T5(Ndb * pNDB, ThreadData * td, int async){
  542.   DEBUG3("T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  543.  td->transactionData.number, 
  544.  td->transactionData.server_id);
  545.   NdbConnection * pCON = 0;
  546.   while((pCON = startTransaction(pNDB, td)) == 0){
  547.     CHECK_ALLOWED_ERROR("T5-1: startTransaction", td, pNDB->getNdbError());
  548.     NdbSleep_MilliSleep(10);
  549.   }
  550.   
  551.   NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  552.   CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
  553.      pCON->getNdbError());
  554.   
  555.   MyOp->interpretedUpdateTuple();
  556.   MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  557.       td->transactionData.number);
  558.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  559.  (char *)&td->transactionData.location);
  560.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  561.  td->transactionData.changed_by);
  562.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  563.  td->transactionData.changed_time);
  564.   MyOp->getValue(IND_SUBSCRIBER_GROUP,
  565.  (char *)&td->transactionData.group_id);
  566.   MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
  567.  (char *)&td->transactionData.sessions);
  568.   MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
  569.  (uint32)td->transactionData.server_bit);
  570.   stat_async = async;
  571.   if (async == 1) {
  572.     pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
  573.   } else {
  574.     int result = pCON->execute( NoCommit );
  575.     T5_Callback_1(result, pCON, (void*)td);
  576.     return;
  577.   }//if
  578. }
  579. void
  580. T5_Callback_1(int result, NdbConnection * pCON, void * threadData){
  581.   ThreadData * td = (ThreadData *)threadData;  
  582.   if (result == -1) {
  583.     CHECK_ALLOWED_ERROR("T5-1: execute", td, pCON->getNdbError());
  584.     td->pNDB->closeTransaction(pCON);
  585.     start_T5(td->pNDB, td, stat_async);
  586.     return;
  587.   }//if
  588.   DEBUG3("T5(%.*s, %.2d): - Callback 1", 
  589.  SUBSCRIBER_NUMBER_LENGTH, 
  590.  td->transactionData.number, 
  591.  td->transactionData.server_id);
  592.   
  593.   NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
  594.   CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
  595.      pCON->getNdbError());
  596.   
  597.   MyOp->readTuple();
  598.   MyOp->equal(IND_GROUP_ID,
  599.       (char*)&td->transactionData.group_id);
  600.   MyOp->getValue(IND_GROUP_ALLOW_DELETE, 
  601.  (char *)&td->transactionData.permission);
  602.   if (stat_async == 1) {
  603.     pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
  604.   } else {
  605.     int result = pCON->execute( NoCommit );
  606.     T5_Callback_2(result, pCON, (void*)td);
  607.     return;
  608.   }//if
  609. }
  610. void
  611. T5_Callback_2(int result, NdbConnection * pCON, void * threadData){
  612.   ThreadData * td = (ThreadData *)threadData;  
  613.   if (result == -1) {
  614.     CHECK_ALLOWED_ERROR("T5-2: execute", td, pCON->getNdbError());
  615.     td->pNDB->closeTransaction(pCON);
  616.     start_T5(td->pNDB, td, stat_async);
  617.     return;
  618.   }//if
  619.   Uint32 permission = td->transactionData.permission;
  620.   Uint32 sessions   = td->transactionData.sessions;
  621.   Uint32 server_bit = td->transactionData.server_bit;
  622.   if(((permission & server_bit) == server_bit) &&
  623.      ((sessions   & server_bit) == server_bit)){
  624.     
  625.     memcpy(td->transactionData.suffix,
  626.    &td->transactionData.number[SFX_START],
  627.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
  628.     
  629.     DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)", 
  630.    SUBSCRIBER_NUMBER_LENGTH, 
  631.    td->transactionData.number, 
  632.    td->transactionData.server_id,
  633.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
  634.    td->transactionData.suffix);
  635.     
  636.     /* Operation 3 */
  637.     NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
  638.     CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
  639.        pCON->getNdbError());
  640.     
  641.     MyOp->deleteTuple();
  642.     MyOp->equal(IND_SESSION_SUBSCRIBER,
  643. (char*)td->transactionData.number);
  644.     MyOp->equal(IND_SESSION_SERVER,
  645. (char*)&td->transactionData.server_id);
  646.     /* Operation 4 */
  647.     
  648.     /* Operation 5 */
  649.     MyOp = pCON->getNdbOperation(SERVER_TABLE);
  650.     CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
  651.        pCON->getNdbError());
  652.     
  653.     MyOp->interpretedUpdateTuple();
  654.     MyOp->equal(IND_SERVER_ID,
  655. (char*)&td->transactionData.server_id);
  656.     MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
  657. (char*)td->transactionData.suffix);
  658.     MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
  659.     td->transactionData.branchExecuted = 1;
  660.   } else {
  661.     td->transactionData.branchExecuted = 0;
  662.     DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s", 
  663.    SUBSCRIBER_NUMBER_LENGTH, 
  664.    td->transactionData.number, 
  665.    td->transactionData.server_id,
  666.    ((permission & server_bit) ? 
  667.     "permission - " : "no permission - "),
  668.    ((sessions   & server_bit) ? 
  669.     "in session - " : "no in session - "));
  670.   }
  671.   
  672.   if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
  673.     if (stat_async == 1) {
  674.       pCON->executeAsynchPrepare( Commit , T5_Callback_3, td);
  675.     } else {
  676.       int result = pCON->execute( Commit );
  677.       T5_Callback_3(result, pCON, (void*)td);
  678.       return;
  679.     }//if
  680.   } else {
  681.     if (stat_async == 1) {
  682.       pCON->executeAsynchPrepare( Rollback , T5_Callback_3, td);
  683.     } else {
  684.       int result = pCON->execute( Rollback );
  685.       T5_Callback_3(result, pCON, (void*)td);
  686.       return;
  687.     }//if
  688.   }
  689. }
  690. void
  691. T5_Callback_3(int result, NdbConnection * pCON, void * threadData){
  692.   ThreadData * td = (ThreadData *)threadData;  
  693.   if (result == -1) {
  694.     CHECK_ALLOWED_ERROR("T5-3: Commit", td, pCON->getNdbError());
  695.     td->pNDB->closeTransaction(pCON);
  696.     start_T5(td->pNDB, td, stat_async);
  697.     return;
  698.   }//if
  699.   
  700.   DEBUG3("T5(%.*s, %.2d): - Completing", 
  701.  SUBSCRIBER_NUMBER_LENGTH, 
  702.  td->transactionData.number, 
  703.  td->transactionData.server_id);
  704.   
  705.   td->pNDB->closeTransaction(pCON);
  706.   complete_T5(td);
  707. }