ndb_async2.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:22k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. //#define DEBUG_ON
  14. #include <string.h>
  15. #include "userInterface.h"
  16. #include "macros.h"
  17. #include "ndb_schema.hpp"
  18. #include "ndb_error.hpp"
  19. #include <NdbSleep.h>
  20. #include <NdbApi.hpp>
  21. void T1_Callback(int result, NdbConnection * pCon, void * threadData);
  22. void T2_Callback(int result, NdbConnection * pCon, void * threadData);
  23. void T3_Callback_1(int result, NdbConnection * pCon, void * threadData);
  24. void T3_Callback_2(int result, NdbConnection * pCon, void * threadData);
  25. void T3_Callback_3(int result, NdbConnection * pCon, void * threadData);
  26. void T4_Callback_1(int result, NdbConnection * pCon, void * threadData);
  27. void T4_Callback_2(int result, NdbConnection * pCon, void * threadData);
  28. void T4_Callback_3(int result, NdbConnection * pCon, void * threadData);
  29. void T5_Callback_1(int result, NdbConnection * pCon, void * threadData);
  30. void T5_Callback_2(int result, NdbConnection * pCon, void * threadData);
  31. void T5_Callback_3(int result, NdbConnection * pCon, void * threadData);
  32. static int stat_async = 0;
  33. /**
  34.  * Transaction 1 - T1 
  35.  *
  36.  * Update location and changed by/time on a subscriber
  37.  *
  38.  * Input: 
  39.  *   SubscriberNumber,
  40.  *   Location,
  41.  *   ChangedBy,
  42.  *   ChangedTime
  43.  *
  44.  * Output:
  45.  */
  46. #define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
  47. inline
  48. NdbConnection *
  49. startTransaction(Ndb * pNDB, ThreadData * td){
  50.   return pNDB->startTransactionDGroup (0, 
  51.        &td->transactionData.number[SFX_START],
  52.        1);
  53. }
  54. void
  55. start_T1(Ndb * pNDB, ThreadData * td, int async){
  56.   DEBUG2("T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  57.  td->transactionData.number); 
  58.   NdbConnection * pCON = 0;
  59.   while((pCON = startTransaction(pNDB, td)) == 0){
  60.     CHECK_ALLOWED_ERROR("T1: startTransaction", td, pNDB->getNdbError());
  61.     NdbSleep_MilliSleep(10);
  62.   }
  63.   NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
  64.   if (MyOp != NULL) {  
  65.     MyOp->updateTuple();  
  66.     MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  67. td->transactionData.number);
  68.     MyOp->setValue(IND_SUBSCRIBER_LOCATION, 
  69.    (char *)&td->transactionData.location);
  70.     MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY, 
  71.    td->transactionData.changed_by);
  72.     MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME, 
  73.    td->transactionData.changed_time);
  74.     if (async == 1) {
  75.       pCON->executeAsynchPrepare( Commit , T1_Callback, td);
  76.     } else {
  77.       int result = pCON->execute(Commit);
  78.       T1_Callback(result, pCON, (void*)td);
  79.       return;
  80.     }//if
  81.   } else {
  82.     CHECK_NULL(MyOp, "T1: getNdbOperation", td, pCON->getNdbError());
  83.   }//if
  84. }
  85. void
  86. T1_Callback(int result, NdbConnection * pCON, void * threadData) {
  87.   ThreadData * td = (ThreadData *)threadData;
  88.   
  89.   DEBUG2("T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
  90.  td->transactionData.number); 
  91.   if (result == -1) {
  92.     CHECK_ALLOWED_ERROR("T1: Commit", td, pCON->getNdbError());
  93.     td->pNDB->closeTransaction(pCON);
  94.     start_T1(td->pNDB, td, stat_async);
  95.     return;
  96.   }//if
  97.   td->pNDB->closeTransaction(pCON);
  98.   complete_T1(td);
  99. }
  100. /**
  101.  * Transaction 2 - T2
  102.  *
  103.  * Read from Subscriber:
  104.  *
  105.  * Input: 
  106.  *   SubscriberNumber
  107.  *
  108.  * Output:
  109.  *   Location
  110.  *   Changed by
  111.  *   Changed Timestamp
  112.  *   Name
  113.  */
  114. void
  115. start_T2(Ndb * pNDB, ThreadData * td, int async){
  116.   DEBUG3("T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  117.  td->transactionData.number, 
  118.  td->transactionData.location);
  119.   
  120.   NdbConnection * pCON = 0;
  121.   
  122.   while((pCON = startTransaction(pNDB, td)) == 0){
  123.     CHECK_ALLOWED_ERROR("T2-1: startTransaction", td, pNDB->getNdbError());
  124.     NdbSleep_MilliSleep(10);
  125.   }
  126.   NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  127.   CHECK_NULL(MyOp, "T2: getNdbOperation", td,
  128.      pCON->getNdbError());
  129.   
  130.   MyOp->readTuple();
  131.   MyOp->equal(IND_SUBSCRIBER_NUMBER,
  132.       td->transactionData.number);
  133.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  134.  (char *)&td->transactionData.location);
  135.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  136.  td->transactionData.changed_by);
  137.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  138.  td->transactionData.changed_time);
  139.   MyOp->getValue(IND_SUBSCRIBER_NAME, 
  140.  td->transactionData.name);
  141.   if (async == 1) {
  142.     pCON->executeAsynchPrepare( Commit , T2_Callback, td);
  143.   } else {
  144.     int result = pCON->execute(Commit);
  145.     T2_Callback(result, pCON, (void*)td);
  146.     return;
  147.   }//if
  148. }
  149. void
  150. T2_Callback(int result, NdbConnection * pCON, void * threadData){
  151.   ThreadData * td = (ThreadData *)threadData;
  152.   DEBUG3("T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
  153.  td->transactionData.number, 
  154.  td->transactionData.location);
  155.   
  156.   if (result == -1) {
  157.     CHECK_ALLOWED_ERROR("T2: Commit", td, pCON->getNdbError());
  158.     td->pNDB->closeTransaction(pCON);
  159.     start_T2(td->pNDB, td, stat_async);
  160.     return;
  161.   }//if
  162.   td->pNDB->closeTransaction(pCON);
  163.   complete_T2(td);
  164. }
  165. /**
  166.  * Transaction 3 - T3
  167.  *
  168.  * Read session details
  169.  *
  170.  * Input:
  171.  *   SubscriberNumber
  172.  *   ServerId
  173.  *   ServerBit
  174.  *
  175.  * Output:
  176.  *   BranchExecuted
  177.  *   SessionDetails
  178.  *   ChangedBy
  179.  *   ChangedTime
  180.  *   Location
  181.  */
  182. void
  183. start_T3(Ndb * pNDB, ThreadData * td, int async){
  184.   DEBUG3("T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  185.  td->transactionData.number, 
  186.  td->transactionData.server_id);
  187.   
  188.   NdbConnection * pCON = 0;
  189.   while((pCON = startTransaction(pNDB, td)) == 0){
  190.     CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
  191.     NdbSleep_MilliSleep(10);
  192.   }
  193.   
  194.   NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  195.   CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
  196.      pCON->getNdbError());
  197.   
  198.   MyOp->readTuple();
  199.   MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  200.       td->transactionData.number);
  201.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  202.  (char *)&td->transactionData.location);
  203.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  204.  td->transactionData.changed_by);
  205.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  206.  td->transactionData.changed_time);
  207.   MyOp->getValue(IND_SUBSCRIBER_GROUP, 
  208.  (char *)&td->transactionData.group_id);
  209.   MyOp->getValue(IND_SUBSCRIBER_SESSIONS, 
  210.  (char *)&td->transactionData.sessions);
  211.   stat_async = async;
  212.   if (async == 1) {
  213.     pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
  214.   } else {
  215.     int result = pCON->execute( NoCommit );
  216.     T3_Callback_1(result, pCON, (void*)td);
  217.     return;
  218.   }//if
  219. }
  220. void
  221. T3_Callback_1(int result, NdbConnection * pCON, void * threadData){
  222.   ThreadData * td = (ThreadData *)threadData;
  223.   DEBUG3("T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH, 
  224.  td->transactionData.number, 
  225.  td->transactionData.server_id);
  226.   if (result == -1) {
  227.     CHECK_ALLOWED_ERROR("T3-1: execute", td, pCON->getNdbError());
  228.     td->pNDB->closeTransaction(pCON);
  229.     start_T3(td->pNDB, td, stat_async);
  230.     return;
  231.   }//if
  232.   NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
  233.   CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
  234.      pCON->getNdbError());
  235.     
  236.   MyOp->readTuple();
  237.   MyOp->equal(IND_GROUP_ID,
  238.       (char*)&td->transactionData.group_id);
  239.   MyOp->getValue(IND_GROUP_ALLOW_READ, 
  240.  (char *)&td->transactionData.permission);
  241.   if (stat_async == 1) {
  242.     pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
  243.   } else {
  244.     int result = pCON->execute( NoCommit );
  245.     T3_Callback_2(result, pCON, (void*)td);
  246.     return;
  247.   }//if
  248. }
  249. void
  250. T3_Callback_2(int result, NdbConnection * pCON, void * threadData){
  251.   ThreadData * td = (ThreadData *)threadData;
  252.   
  253.   if (result == -1) {
  254.     CHECK_ALLOWED_ERROR("T3-2: execute", td, pCON->getNdbError());
  255.     td->pNDB->closeTransaction(pCON);
  256.     start_T3(td->pNDB, td, stat_async);
  257.     return;
  258.   }//if
  259.   
  260.   Uint32 permission = td->transactionData.permission;
  261.   Uint32 sessions   = td->transactionData.sessions;
  262.   Uint32 server_bit = td->transactionData.server_bit;
  263.   if(((permission & server_bit) == server_bit) &&
  264.      ((sessions   & server_bit) == server_bit)){
  265.     
  266.     memcpy(td->transactionData.suffix,
  267.    &td->transactionData.number[SFX_START],
  268.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
  269.     DEBUG5("T3(%.*s, %.2d): - Callback 2 - reading(%.*s)", 
  270.    SUBSCRIBER_NUMBER_LENGTH, 
  271.    td->transactionData.number, 
  272.    td->transactionData.server_id,
  273.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
  274.    td->transactionData.suffix);
  275.     
  276.     /* Operation 3 */
  277.     NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
  278.     CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
  279.        pCON->getNdbError());
  280.     
  281.     MyOp->simpleRead();
  282.     MyOp->equal(IND_SESSION_SUBSCRIBER,
  283. (char*)td->transactionData.number);
  284.     MyOp->equal(IND_SESSION_SERVER,
  285. (char*)&td->transactionData.server_id);
  286.     MyOp->getValue(IND_SESSION_DATA, 
  287.    (char *)td->transactionData.session_details);
  288.     
  289.     /* Operation 4 */
  290.     MyOp = pCON->getNdbOperation(SERVER_TABLE);
  291.     CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
  292.        pCON->getNdbError());
  293.     
  294.     MyOp->interpretedUpdateTuple();
  295.     MyOp->equal(IND_SERVER_ID,
  296. (char*)&td->transactionData.server_id);
  297.     MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
  298. (char*)td->transactionData.suffix);
  299.     MyOp->incValue(IND_SERVER_READS, (uint32)1);
  300.     td->transactionData.branchExecuted = 1;
  301.   } else {
  302.     DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
  303.    SUBSCRIBER_NUMBER_LENGTH, 
  304.    td->transactionData.number, 
  305.    td->transactionData.server_id);
  306.     td->transactionData.branchExecuted = 0;
  307.   }
  308.   if (stat_async == 1) {
  309.     pCON->executeAsynchPrepare( Commit , T3_Callback_3, td);
  310.   } else {
  311.     int result = pCON->execute( Commit );
  312.     T3_Callback_3(result, pCON, (void*)td);
  313.     return;
  314.   }//if
  315. }
  316. void
  317. T3_Callback_3(int result, NdbConnection * pCON, void * threadData){
  318.   ThreadData * td = (ThreadData *)threadData;  
  319.   DEBUG3("T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH, 
  320.  td->transactionData.number, 
  321.  td->transactionData.server_id);
  322.   
  323.   if (result == -1) {
  324.     CHECK_ALLOWED_ERROR("T3-3: Commit", td, pCON->getNdbError());
  325.     td->pNDB->closeTransaction(pCON);
  326.     start_T3(td->pNDB, td, stat_async);
  327.     return;
  328.   }//if
  329.   td->pNDB->closeTransaction(pCON);
  330.   complete_T3(td);
  331. }
  332. /**
  333.  * Transaction 4 - T4
  334.  * 
  335.  * Create session
  336.  *
  337.  * Input:
  338.  *   SubscriberNumber
  339.  *   ServerId
  340.  *   ServerBit
  341.  *   SessionDetails,
  342.  *   DoRollback
  343.  * Output:
  344.  *   ChangedBy
  345.  *   ChangedTime
  346.  *   Location
  347.  *   BranchExecuted
  348.  */
  349. void
  350. start_T4(Ndb * pNDB, ThreadData * td, int async){
  351.   DEBUG3("T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  352.  td->transactionData.number, 
  353.  td->transactionData.server_id);
  354.   
  355.   NdbConnection * pCON = 0;
  356.   while((pCON = startTransaction(pNDB, td)) == 0){
  357.     CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
  358.     NdbSleep_MilliSleep(10);
  359.   }
  360.   
  361.   NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  362.   CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
  363.      pCON->getNdbError());
  364.   
  365.   MyOp->interpretedUpdateTuple();
  366.   MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  367.       td->transactionData.number);
  368.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  369.  (char *)&td->transactionData.location);
  370.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  371.  td->transactionData.changed_by);
  372.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  373.  td->transactionData.changed_time);
  374.   MyOp->getValue(IND_SUBSCRIBER_GROUP,
  375.  (char *)&td->transactionData.group_id);
  376.   MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
  377.  (char *)&td->transactionData.sessions); 
  378.   MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
  379.  (uint32)td->transactionData.server_bit);
  380.   stat_async = async;
  381.   if (async == 1) {
  382.     pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
  383.   } else {
  384.     int result = pCON->execute( NoCommit );
  385.     T4_Callback_1(result, pCON, (void*)td);
  386.     return;
  387.   }//if
  388. }
  389. void
  390. T4_Callback_1(int result, NdbConnection * pCON, void * threadData){
  391.   ThreadData * td = (ThreadData *)threadData;  
  392.   if (result == -1) {
  393.     CHECK_ALLOWED_ERROR("T4-1: execute", td, pCON->getNdbError());
  394.     td->pNDB->closeTransaction(pCON);
  395.     start_T4(td->pNDB, td, stat_async);
  396.     return;
  397.   }//if
  398.   
  399.   DEBUG3("T4(%.*s, %.2d): - Callback 1", 
  400.  SUBSCRIBER_NUMBER_LENGTH, 
  401.  td->transactionData.number, 
  402.  td->transactionData.server_id);
  403.   NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
  404.   CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
  405.      pCON->getNdbError());
  406.   
  407.   MyOp->readTuple();
  408.   MyOp->equal(IND_GROUP_ID,
  409.       (char*)&td->transactionData.group_id);
  410.   MyOp->getValue(IND_GROUP_ALLOW_INSERT, 
  411.  (char *)&td->transactionData.permission);
  412.   if (stat_async == 1) {
  413.     pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
  414.   } else {
  415.     int result = pCON->execute( NoCommit );
  416.     T4_Callback_2(result, pCON, (void*)td);
  417.     return;
  418.   }//if
  419. }
  420. void
  421. T4_Callback_2(int result, NdbConnection * pCON, void * threadData){
  422.   ThreadData * td = (ThreadData *)threadData;  
  423.   if (result == -1) {
  424.     CHECK_ALLOWED_ERROR("T4-2: execute", td, pCON->getNdbError());
  425.     td->pNDB->closeTransaction(pCON);
  426.     start_T4(td->pNDB, td, stat_async);
  427.     return;
  428.   }//if
  429.   Uint32 permission = td->transactionData.permission;
  430.   Uint32 sessions   = td->transactionData.sessions;
  431.   Uint32 server_bit = td->transactionData.server_bit;
  432.   
  433.   if(((permission & server_bit) == server_bit) &&
  434.      ((sessions   & server_bit) == 0)){
  435.     
  436.     memcpy(td->transactionData.suffix,
  437.    &td->transactionData.number[SFX_START],
  438.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
  439.     
  440.     DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)", 
  441.    SUBSCRIBER_NUMBER_LENGTH, 
  442.    td->transactionData.number, 
  443.    td->transactionData.server_id,
  444.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
  445.    td->transactionData.suffix);
  446.     
  447.     /* Operation 3 */
  448.     
  449.     NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
  450.     CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
  451.        pCON->getNdbError());
  452.     
  453.     MyOp->insertTuple();
  454.     MyOp->equal(IND_SESSION_SUBSCRIBER,
  455. (char*)td->transactionData.number);
  456.     MyOp->equal(IND_SESSION_SERVER,
  457. (char*)&td->transactionData.server_id);
  458.     MyOp->setValue(SESSION_DATA, 
  459.    (char *)td->transactionData.session_details);
  460.     /* Operation 4 */
  461.     
  462.     /* Operation 5 */
  463.     MyOp = pCON->getNdbOperation(SERVER_TABLE);
  464.     CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
  465.        pCON->getNdbError());
  466.     
  467.     MyOp->interpretedUpdateTuple();
  468.     MyOp->equal(IND_SERVER_ID,
  469. (char*)&td->transactionData.server_id);
  470.     MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
  471. (char*)td->transactionData.suffix);
  472.     MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
  473.     td->transactionData.branchExecuted = 1;
  474.   } else {
  475.     td->transactionData.branchExecuted = 0;
  476.     DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s",
  477.    SUBSCRIBER_NUMBER_LENGTH, 
  478.    td->transactionData.number, 
  479.    td->transactionData.server_id,
  480.    ((permission & server_bit) ? 
  481.     "permission - " : "no permission - "),
  482.    ((sessions   & server_bit) ? 
  483.     "in session - " : "no in session - "));
  484.   }
  485.   
  486.   if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
  487.     if (stat_async == 1) {
  488.       pCON->executeAsynchPrepare( Commit , T4_Callback_3, td);
  489.     } else {
  490.       int result = pCON->execute( Commit );
  491.       T4_Callback_3(result, pCON, (void*)td);
  492.       return;
  493.     }//if
  494.   } else {
  495.     if (stat_async == 1) {
  496.       pCON->executeAsynchPrepare( Rollback , T4_Callback_3, td);
  497.     } else {
  498.       int result = pCON->execute( Rollback );
  499.       T4_Callback_3(result, pCON, (void*)td);
  500.       return;
  501.     }//if
  502.   }
  503. }
  504. void
  505. T4_Callback_3(int result, NdbConnection * pCON, void * threadData){
  506.   ThreadData * td = (ThreadData *)threadData;  
  507.   if (result == -1) {
  508.     CHECK_ALLOWED_ERROR("T4-3: Commit", td, pCON->getNdbError());
  509.     td->pNDB->closeTransaction(pCON);
  510.     start_T4(td->pNDB, td, stat_async);
  511.     return;
  512.   }//if
  513.   
  514.   DEBUG3("T4(%.*s, %.2d): - Completing", 
  515.  SUBSCRIBER_NUMBER_LENGTH, 
  516.  td->transactionData.number, 
  517.  td->transactionData.server_id);
  518.   td->pNDB->closeTransaction(pCON);
  519.   complete_T4(td);
  520. }
  521. /**
  522.  * Transaction 5 - T5
  523.  * 
  524.  * Delete session
  525.  *
  526.  * Input:
  527.  *   SubscriberNumber
  528.  *   ServerId
  529.  *   ServerBit
  530.  *   DoRollback
  531.  * Output:
  532.  *   ChangedBy
  533.  *   ChangedTime
  534.  *   Location
  535.  *   BranchExecuted
  536.  */
  537. void
  538. start_T5(Ndb * pNDB, ThreadData * td, int async){
  539.   DEBUG3("T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH, 
  540.  td->transactionData.number, 
  541.  td->transactionData.server_id);
  542.   NdbConnection * pCON = 0;
  543.   while((pCON = startTransaction(pNDB, td)) == 0){
  544.     CHECK_ALLOWED_ERROR("T5-1: startTransaction", td, pNDB->getNdbError());
  545.     NdbSleep_MilliSleep(10);
  546.   }
  547.   
  548.   NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
  549.   CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
  550.      pCON->getNdbError());
  551.   
  552.   MyOp->interpretedUpdateTuple();
  553.   MyOp->equal(IND_SUBSCRIBER_NUMBER, 
  554.       td->transactionData.number);
  555.   MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
  556.  (char *)&td->transactionData.location);
  557.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
  558.  td->transactionData.changed_by);
  559.   MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
  560.  td->transactionData.changed_time);
  561.   MyOp->getValue(IND_SUBSCRIBER_GROUP,
  562.  (char *)&td->transactionData.group_id);
  563.   MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
  564.  (char *)&td->transactionData.sessions);
  565.   MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
  566.  (uint32)td->transactionData.server_bit);
  567.   stat_async = async;
  568.   if (async == 1) {
  569.     pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
  570.   } else {
  571.     int result = pCON->execute( NoCommit );
  572.     T5_Callback_1(result, pCON, (void*)td);
  573.     return;
  574.   }//if
  575. }
  576. void
  577. T5_Callback_1(int result, NdbConnection * pCON, void * threadData){
  578.   ThreadData * td = (ThreadData *)threadData;  
  579.   if (result == -1) {
  580.     CHECK_ALLOWED_ERROR("T5-1: execute", td, pCON->getNdbError());
  581.     td->pNDB->closeTransaction(pCON);
  582.     start_T5(td->pNDB, td, stat_async);
  583.     return;
  584.   }//if
  585.   DEBUG3("T5(%.*s, %.2d): - Callback 1", 
  586.  SUBSCRIBER_NUMBER_LENGTH, 
  587.  td->transactionData.number, 
  588.  td->transactionData.server_id);
  589.   
  590.   NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
  591.   CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
  592.      pCON->getNdbError());
  593.   
  594.   MyOp->readTuple();
  595.   MyOp->equal(IND_GROUP_ID,
  596.       (char*)&td->transactionData.group_id);
  597.   MyOp->getValue(IND_GROUP_ALLOW_DELETE, 
  598.  (char *)&td->transactionData.permission);
  599.   if (stat_async == 1) {
  600.     pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
  601.   } else {
  602.     int result = pCON->execute( NoCommit );
  603.     T5_Callback_2(result, pCON, (void*)td);
  604.     return;
  605.   }//if
  606. }
  607. void
  608. T5_Callback_2(int result, NdbConnection * pCON, void * threadData){
  609.   ThreadData * td = (ThreadData *)threadData;  
  610.   if (result == -1) {
  611.     CHECK_ALLOWED_ERROR("T5-2: execute", td, pCON->getNdbError());
  612.     td->pNDB->closeTransaction(pCON);
  613.     start_T5(td->pNDB, td, stat_async);
  614.     return;
  615.   }//if
  616.   Uint32 permission = td->transactionData.permission;
  617.   Uint32 sessions   = td->transactionData.sessions;
  618.   Uint32 server_bit = td->transactionData.server_bit;
  619.   if(((permission & server_bit) == server_bit) &&
  620.      ((sessions   & server_bit) == server_bit)){
  621.     
  622.     memcpy(td->transactionData.suffix,
  623.    &td->transactionData.number[SFX_START],
  624.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
  625.     
  626.     DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)", 
  627.    SUBSCRIBER_NUMBER_LENGTH, 
  628.    td->transactionData.number, 
  629.    td->transactionData.server_id,
  630.    SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
  631.    td->transactionData.suffix);
  632.     
  633.     /* Operation 3 */
  634.     NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
  635.     CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
  636.        pCON->getNdbError());
  637.     
  638.     MyOp->deleteTuple();
  639.     MyOp->equal(IND_SESSION_SUBSCRIBER,
  640. (char*)td->transactionData.number);
  641.     MyOp->equal(IND_SESSION_SERVER,
  642. (char*)&td->transactionData.server_id);
  643.     /* Operation 4 */
  644.     
  645.     /* Operation 5 */
  646.     MyOp = pCON->getNdbOperation(SERVER_TABLE);
  647.     CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
  648.        pCON->getNdbError());
  649.     
  650.     MyOp->interpretedUpdateTuple();
  651.     MyOp->equal(IND_SERVER_ID,
  652. (char*)&td->transactionData.server_id);
  653.     MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
  654. (char*)td->transactionData.suffix);
  655.     MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
  656.     td->transactionData.branchExecuted = 1;
  657.   } else {
  658.     td->transactionData.branchExecuted = 0;
  659.     DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s", 
  660.    SUBSCRIBER_NUMBER_LENGTH, 
  661.    td->transactionData.number, 
  662.    td->transactionData.server_id,
  663.    ((permission & server_bit) ? 
  664.     "permission - " : "no permission - "),
  665.    ((sessions   & server_bit) ? 
  666.     "in session - " : "no in session - "));
  667.   }
  668.   
  669.   if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
  670.     if (stat_async == 1) {
  671.       pCON->executeAsynchPrepare( Commit , T5_Callback_3, td);
  672.     } else {
  673.       int result = pCON->execute( Commit );
  674.       T5_Callback_3(result, pCON, (void*)td);
  675.       return;
  676.     }//if
  677.   } else {
  678.     if (stat_async == 1) {
  679.       pCON->executeAsynchPrepare( Rollback , T5_Callback_3, td);
  680.     } else {
  681.       int result = pCON->execute( Rollback );
  682.       T5_Callback_3(result, pCON, (void*)td);
  683.       return;
  684.     }//if
  685.   }
  686. }
  687. void
  688. T5_Callback_3(int result, NdbConnection * pCON, void * threadData){
  689.   ThreadData * td = (ThreadData *)threadData;  
  690.   if (result == -1) {
  691.     CHECK_ALLOWED_ERROR("T5-3: Commit", td, pCON->getNdbError());
  692.     td->pNDB->closeTransaction(pCON);
  693.     start_T5(td->pNDB, td, stat_async);
  694.     return;
  695.   }//if
  696.   
  697.   DEBUG3("T5(%.*s, %.2d): - Completing", 
  698.  SUBSCRIBER_NUMBER_LENGTH, 
  699.  td->transactionData.number, 
  700.  td->transactionData.server_id);
  701.   
  702.   td->pNDB->closeTransaction(pCON);
  703.   complete_T5(td);
  704. }