msa.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:36k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbApi.hpp>
  15. #include <NdbSchemaCon.hpp>
  16. #include <NdbCondition.h>
  17. #include <NdbMutex.h>
  18. #include <NdbSleep.h>
  19. #include <NdbThread.h>
  20. #include <NdbTick.h>
  21. const char* const c_szDatabaseName = "TEST_DB";
  22. const char* const c_szTableNameStored = "CCStored";
  23. const char* const c_szTableNameTemp = "CCTemp";
  24. const char* const c_szContextId = "ContextId";
  25. const char* const c_szVersion = "Version";
  26. const char* const c_szLockFlag = "LockFlag";
  27. const char* const c_szLockTime = "LockTime";
  28. const char* const c_szLockTimeUSec = "LockTimeUSec";
  29. const char* const c_szContextData = "ContextData";
  30. const char* g_szTableName = c_szTableNameStored;
  31. #ifdef NDB_WIN32
  32. HANDLE hShutdownEvent = 0;
  33. #else
  34. #include <signal.h>
  35. bool bShutdownEvent = false;
  36. #endif
  37. long g_nMaxContextIdPerThread = 5000;
  38. long g_nNumThreads = 0;
  39. long g_nMaxCallsPerSecond = 0;
  40. long g_nMaxRetry = 50;
  41. bool g_bWriteTuple = false;
  42. bool g_bInsertInitial = false;
  43. bool g_bVerifyInitial = false;
  44. NdbMutex* g_pNdbMutexPrintf = 0;
  45. NdbMutex* g_pNdbMutexIncrement = 0;
  46. long g_nNumCallsProcessed = 0;
  47. NDB_TICKS g_tStartTime = 0;
  48. NDB_TICKS g_tEndTime = 0;
  49. long g_nNumberOfInitialInsert = 0;
  50. long g_nNumberOfInitialVerify = 0;
  51. const long c_nMaxMillisecForAllCall = 5000;
  52. long* g_plCountMillisecForCall = 0;
  53. const long c_nMaxMillisecForAllTrans = 5000;
  54. long* g_plCountMillisecForTrans = 0;
  55. bool g_bReport = false;
  56. bool g_bReportPlus = false;
  57. // data for CALL_CONTEXT and GROUP_RESOURCE
  58. static char STATUS_DATA[]= 
  59. "000102030405060708090A0B0C0D0E0F000102030405060708090A0B0C0D0E0F"
  60. "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
  61. "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
  62. "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
  63. "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
  64. "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
  65. "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
  66. "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
  67. "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
  68. "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
  69. "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
  70. "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
  71. "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
  72. "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
  73. "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
  74. "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
  75. "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
  76. "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
  77. "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
  78. "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
  79. "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
  80. "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
  81. "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
  82. "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
  83. "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
  84. "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
  85. "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
  86. "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
  87. "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
  88. "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
  89. "23E23F240241242243244245246247248000102030405060708090A0B0C0D0EF"
  90. "24924A24B24C24D24E24F250251252253000102030405060708090A0B0C0D0EF"
  91. "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
  92. "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
  93. "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
  94. "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
  95. "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
  96. "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
  97. "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
  98. "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
  99. "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
  100. "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
  101. "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
  102. "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
  103. "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
  104. "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
  105. "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
  106. "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
  107. "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
  108. "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
  109. "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
  110. "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
  111. "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
  112. "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
  113. "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
  114. "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
  115. "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
  116. "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
  117. "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
  118. "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
  119. "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
  120. "2366890FE1438751097E7F6325DC0E6326F"
  121. "25425525625725825925A25B25C25D25E25F000102030405060708090A0B0C0F";     
  122. long g_nStatusDataSize = sizeof(STATUS_DATA);
  123. // Thread function for Call Context Inserts
  124. #ifdef NDB_WIN32
  125. BOOL WINAPI ConsoleCtrlHandler(DWORD dwCtrlType)
  126. {
  127.     if(CTRL_C_EVENT == dwCtrlType)
  128.     {
  129.         SetEvent(hShutdownEvent);
  130.         return TRUE;
  131.     }
  132.     return FALSE;
  133. }
  134. #else
  135. void CtrlCHandler(int)
  136. {
  137.     bShutdownEvent = true;
  138. }
  139. #endif
  140. void ReportNdbError(const char* szMsg, const NdbError& err)
  141. {
  142.     NdbMutex_Lock(g_pNdbMutexPrintf);
  143.     printf("%s: %d: %sn", szMsg, err.code, (err.message ? err.message : ""));
  144.     NdbMutex_Unlock(g_pNdbMutexPrintf);
  145. }
  146. void
  147. ReportCallsPerSecond(long nNumCallsProcessed, 
  148.                      NDB_TICKS tStartTime, 
  149.                      NDB_TICKS tEndTime)
  150. {
  151.     NDB_TICKS tElapsed = tEndTime - tStartTime;
  152.     long lCallsPerSec;
  153.     if(tElapsed>0)
  154.         lCallsPerSec = (long)((1000*nNumCallsProcessed)/tElapsed);
  155.     else
  156.         lCallsPerSec = 0;
  157.     NdbMutex_Lock(g_pNdbMutexPrintf);
  158.     printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)n",
  159.         nNumCallsProcessed, (long)tElapsed, lCallsPerSec);
  160.     NdbMutex_Unlock(g_pNdbMutexPrintf);
  161. }
  162. #ifndef NDB_WIN32
  163. void InterlockedIncrement(long* lp)             // expensive
  164. {
  165.     NdbMutex_Lock(g_pNdbMutexIncrement);
  166.     (*lp)++;
  167.     NdbMutex_Unlock(g_pNdbMutexIncrement);
  168. }
  169. #endif
  170. void InterlockedIncrementAndReport(void)
  171. {
  172.     NdbMutex_Lock(g_pNdbMutexIncrement);
  173.     ++g_nNumCallsProcessed;
  174.     if((g_nNumCallsProcessed%1000)==0) 
  175.     {
  176.         g_tEndTime = NdbTick_CurrentMillisecond();
  177.         if(g_tStartTime) 
  178.             ReportCallsPerSecond(1000, g_tStartTime, g_tEndTime);
  179.         g_tStartTime = g_tEndTime;
  180.     }
  181.     NdbMutex_Unlock(g_pNdbMutexIncrement);
  182. }
  183. void SleepOneCall(void)
  184. {
  185.     int iMillisecToSleep;
  186.     if(g_nMaxCallsPerSecond>0)
  187.         iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
  188.     else
  189.         iMillisecToSleep = 50;
  190.     if(iMillisecToSleep>0)
  191.         NdbSleep_MilliSleep(iMillisecToSleep);
  192. }
  193. int QueryTransaction(Ndb* pNdb, 
  194.                      long iContextId,                       
  195.                      long* piVersion, 
  196.                      long* piLockFlag, 
  197.                      long* piLockTime, 
  198.                      long* piLockTimeUSec, 
  199.                      char* pchContextData, 
  200.                      NdbError& err)
  201. {
  202.     int iRes = -1;
  203.     NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
  204.     if(pNdbConnection)
  205.     {
  206.         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
  207.         if(pNdbOperation)
  208.         {
  209.             NdbRecAttr* pNdbRecAttrVersion;
  210.             NdbRecAttr* pNdbRecAttrLockFlag;
  211.             NdbRecAttr* pNdbRecAttrLockTime;
  212.             NdbRecAttr* pNdbRecAttrLockTimeUSec;
  213.             NdbRecAttr* pNdbRecAttrContextData;
  214.             if(!pNdbOperation->readTuple()
  215.             && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
  216.             && (pNdbRecAttrVersion=pNdbOperation->getValue(c_szVersion, (char*)piVersion))
  217.             && (pNdbRecAttrLockFlag=pNdbOperation->getValue(c_szLockFlag, (char*)piLockFlag))
  218.             && (pNdbRecAttrLockTime=pNdbOperation->getValue(c_szLockTime, (char*)piLockTime))
  219.             && (pNdbRecAttrLockTimeUSec=pNdbOperation->getValue(c_szLockTimeUSec, (char*)piLockTimeUSec))
  220.             && (pNdbRecAttrContextData=pNdbOperation->getValue(c_szContextData, pchContextData)))
  221.             {
  222.                 if(!pNdbConnection->execute(Commit))
  223.                     iRes = 0;
  224.                 else 
  225.                     err = pNdbConnection->getNdbError();
  226.             } 
  227.             else 
  228.                 err = pNdbOperation->getNdbError();
  229.         } 
  230.         else 
  231.             err = pNdbConnection->getNdbError();
  232.         pNdb->closeTransaction(pNdbConnection);
  233.     } 
  234.     else 
  235.         err = pNdb->getNdbError();
  236.     
  237.     return iRes;
  238. }
  239. int RetryQueryTransaction(Ndb* pNdb, 
  240.                           long iContextId, 
  241.                           long* piVersion, 
  242.                           long* piLockFlag, 
  243.                           long* piLockTime, 
  244.                           long* piLockTimeUSec, 
  245.                           char* pchContextData, 
  246.                           NdbError& err, 
  247.                           int& nRetry)
  248. {
  249.     int iRes = -1;
  250.     nRetry = 0;
  251.     bool bRetry = true;
  252.     while(bRetry && nRetry<g_nMaxRetry)
  253.     {
  254.         if(!QueryTransaction(pNdb, iContextId, piVersion, piLockFlag, 
  255.             piLockTime, piLockTimeUSec, pchContextData, err))
  256.         {
  257.             iRes = 0;
  258.             bRetry = false;
  259.         }
  260.         else
  261.         {
  262.             switch(err.status)
  263.             {
  264.             case NdbError::TemporaryError:
  265.             case NdbError::UnknownResult:
  266.                 SleepOneCall();
  267.                 ++nRetry;
  268.                 break;
  269.             
  270.             case NdbError::PermanentError:
  271.             default:
  272.                 bRetry = false;
  273.                 break;
  274.             }
  275.         }
  276.     }
  277.     return iRes;
  278. }
  279. int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
  280. {
  281.     int iRes = -1;
  282.     NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
  283.     if(pNdbConnection)
  284.     {
  285.         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
  286.         if(pNdbOperation)
  287.         {
  288.             if(!pNdbOperation->deleteTuple()
  289.             && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)) 
  290.             {
  291.                 if(pNdbConnection->execute(Commit) == 0) 
  292.                     iRes = 0;
  293.                 else 
  294.                     err = pNdbConnection->getNdbError();
  295.             } 
  296.             else 
  297.                 err = pNdbOperation->getNdbError();
  298.         } 
  299.         else 
  300.             err = pNdbConnection->getNdbError();
  301.         pNdb->closeTransaction(pNdbConnection);
  302.     } 
  303.     else 
  304.         err = pNdb->getNdbError();
  305.     return iRes;
  306. }
  307. int RetryDeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
  308. {
  309.     int iRes = -1;
  310.     nRetry = 0;
  311.     bool bRetry = true;
  312.     bool bUnknown = false;
  313.     while(bRetry && nRetry<g_nMaxRetry)
  314.     {
  315.         if(!DeleteTransaction(pNdb, iContextId, err))
  316.         {
  317.             iRes = 0;
  318.             bRetry = false;
  319.         }
  320.         else
  321.         {
  322.             switch(err.status)
  323.             {
  324.             case NdbError::UnknownResult:
  325.                 bUnknown = true;
  326.                 ++nRetry;
  327.                 break;
  328.             case NdbError::TemporaryError:
  329.                 bUnknown = false;
  330.                 SleepOneCall();
  331.                 ++nRetry;
  332.                 break;
  333.             
  334.             case NdbError::PermanentError:
  335.                 if(err.code==626 && bUnknown)
  336.                     iRes = 0;
  337.                 bRetry = false;
  338.                 break;
  339.             default:
  340.                 bRetry = false;
  341.                 break;
  342.             }
  343.         }
  344.     }
  345.     return iRes;
  346. }
  347. int InsertTransaction(Ndb* pNdb, 
  348.                       long iContextID, 
  349.                       long iVersion, 
  350.                       long iLockFlag, 
  351.                       long iLockTime, 
  352.                       long iLockTimeUSec, 
  353.                       const char* pchContextData, 
  354.                       NdbError& err)
  355. {
  356.     int iRes = -1;
  357.     NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextID, 4);
  358.     if(pNdbConnection)
  359.     {
  360.         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
  361.         if(pNdbOperation) 
  362.         {
  363.             if(!(g_bWriteTuple ? pNdbOperation->writeTuple() : pNdbOperation->insertTuple())
  364.                 && !pNdbOperation->equal(c_szContextId, (Int32)iContextID)
  365.                 && !pNdbOperation->setValue(c_szVersion, (Int32)iVersion)
  366.                 && !pNdbOperation->setValue(c_szLockFlag, (Int32)iLockFlag)
  367.                 && !pNdbOperation->setValue(c_szLockTime, (Int32)iLockTime)
  368.                 && !pNdbOperation->setValue(c_szLockTimeUSec, (Int32)iLockTimeUSec)
  369.                 && !pNdbOperation->setValue(c_szContextData, pchContextData, g_nStatusDataSize))  
  370.             {
  371.                 if(!pNdbConnection->execute(Commit))
  372.                     iRes = 0;
  373.                 else 
  374.                     err = pNdbConnection->getNdbError();
  375.             }
  376.             else 
  377.                 err = pNdbOperation->getNdbError();
  378.         } 
  379.         else 
  380.             err = pNdbConnection->getNdbError();
  381.         pNdb->closeTransaction(pNdbConnection);
  382.     } 
  383.     else 
  384.         err = pNdb->getNdbError();
  385.     return iRes;
  386. }
  387. int RetryInsertTransaction(Ndb* pNdb, 
  388.                            long iContextId, 
  389.                            long iVersion, 
  390.                            long iLockFlag, 
  391.                            long iLockTime, 
  392.                            long iLockTimeUSec, 
  393.                            const char* pchContextData, 
  394.                            NdbError& err, int& nRetry)
  395. {
  396.     int iRes = -1;
  397.     nRetry = 0;
  398.     bool bRetry = true;
  399.     bool bUnknown = false;
  400.     while(bRetry && nRetry<g_nMaxRetry)
  401.     {
  402.         if(!InsertTransaction(pNdb, iContextId, iVersion, iLockFlag, 
  403.             iLockTime, iLockTimeUSec, pchContextData, err))
  404.         {
  405.             iRes = 0;
  406.             bRetry = false;
  407.         }
  408.         else
  409.         {
  410.             switch(err.status)
  411.             {
  412.             case NdbError::UnknownResult:
  413.                 bUnknown = true;
  414.                 ++nRetry;
  415.                 break;
  416.             case NdbError::TemporaryError:
  417.                 bUnknown = false;
  418.                 SleepOneCall();
  419.                 ++nRetry;
  420.                 break;
  421.             
  422.             case NdbError::PermanentError:
  423.                 if(err.code==630 && bUnknown)
  424.                     iRes = 0;
  425.                 bRetry = false;
  426.                 break;
  427.             default:
  428.                 bRetry = false;
  429.                 break;
  430.             }
  431.         }
  432.     }
  433.     return iRes;
  434. }
  435. int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
  436. {
  437.     int iRes = -1;
  438.     NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
  439.     if(pNdbConnection)
  440.     {
  441.         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
  442.         if(pNdbOperation)
  443.         {
  444.             if(!pNdbOperation->updateTuple()
  445.             && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
  446.             && !pNdbOperation->setValue(c_szContextData, STATUS_DATA, g_nStatusDataSize))
  447.             {
  448.                 if(!pNdbConnection->execute(Commit))
  449.                     iRes = 0;
  450.                 else 
  451.                     err = pNdbConnection->getNdbError();
  452.             }
  453.             else 
  454.                 err = pNdbOperation->getNdbError();
  455.         } 
  456.         else 
  457.             err = pNdbConnection->getNdbError();
  458.         pNdb->closeTransaction(pNdbConnection);
  459.     } 
  460.     else 
  461.         err = pNdb->getNdbError();
  462.     return iRes;
  463. }
  464. int RetryUpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
  465. {
  466.     int iRes = -1;
  467.     nRetry = 0;
  468.     bool bRetry = true;
  469.     while(bRetry && nRetry<g_nMaxRetry)
  470.     {
  471.         if(!UpdateTransaction(pNdb, iContextId, err))
  472.         {
  473.             iRes = 0;
  474.             bRetry = false;
  475.         }
  476.         else
  477.         {
  478.             switch(err.status)
  479.             {
  480.             case NdbError::TemporaryError:
  481.             case NdbError::UnknownResult:
  482.                 SleepOneCall();
  483.                 ++nRetry;
  484.                 break;
  485.             
  486.             case NdbError::PermanentError:
  487.             default:
  488.                 bRetry = false;
  489.                 break;
  490.             }
  491.         }
  492.     }
  493.     return iRes;
  494. }
  495. int InsertInitialRecords(Ndb* pNdb, long nInsert, long nSeed)
  496. {
  497.     int iRes = -1;
  498.     char szMsg[100];
  499.     for(long i=0; i<nInsert; ++i) 
  500.     {
  501.         int iContextID = i+nSeed;
  502.         int nRetry = 0;
  503.         NdbError err;
  504.         memset(&err, 0, sizeof(err));
  505.         NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond();
  506.         iRes = RetryInsertTransaction(pNdb, iContextID, nSeed, iContextID,
  507.             (long)(tStartTrans/1000), (long)((tStartTrans%1000)*1000), 
  508.             STATUS_DATA, err, nRetry);
  509.         NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond();
  510.         long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
  511.         if(nRetry>0)
  512.         {
  513.             sprintf(szMsg, "insert retried %d times, time %ld msec.", 
  514.                 nRetry, lMillisecForThisTrans);
  515.             ReportNdbError(szMsg, err);
  516.         }
  517.         if(iRes)
  518.         {
  519.             ReportNdbError("Insert initial record failed", err);
  520.             return iRes;
  521.         }
  522.         InterlockedIncrement(&g_nNumberOfInitialInsert);
  523.     }
  524.     return iRes;
  525. }
  526. int VerifyInitialRecords(Ndb* pNdb, long nVerify, long nSeed)
  527. {
  528.     int iRes = -1;
  529.     char* pchContextData = new char[g_nStatusDataSize];
  530.     char szMsg[100];
  531.     long iPrevLockTime = -1;
  532.     long iPrevLockTimeUSec = -1;
  533.     for(long i=0; i<nVerify; ++i) 
  534.     {
  535.         int iContextID = i+nSeed;
  536.         long iVersion = 0;
  537.         long iLockFlag = 0;
  538.         long iLockTime = 0;
  539.         long iLockTimeUSec = 0;
  540.         int nRetry = 0;
  541.         NdbError err;
  542.         memset(&err, 0, sizeof(err));
  543.         NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond();
  544.         iRes = RetryQueryTransaction(pNdb, iContextID, &iVersion, &iLockFlag, 
  545.                     &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
  546.         NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond();
  547.         long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
  548.         if(nRetry>0)
  549.         {
  550.             sprintf(szMsg, "verify retried %d times, time %ld msec.", 
  551.                 nRetry, lMillisecForThisTrans);
  552.             ReportNdbError(szMsg, err);
  553.         }
  554.         if(iRes)
  555.         {
  556.             ReportNdbError("Read initial record failed", err);
  557.             delete[] pchContextData;
  558.             return iRes;
  559.         }
  560.         if(memcmp(pchContextData, STATUS_DATA, g_nStatusDataSize))
  561.         {
  562.             sprintf(szMsg, "wrong context data in tuple %d", iContextID);
  563.             ReportNdbError(szMsg, err);
  564.             delete[] pchContextData;
  565.             return -1;
  566.         }
  567.         if(iVersion!=nSeed 
  568.             || iLockFlag!=iContextID 
  569.             || iLockTime<iPrevLockTime 
  570.             || (iLockTime==iPrevLockTime && iLockTimeUSec<iPrevLockTimeUSec))
  571.         {
  572.             sprintf(szMsg, "wrong call data in tuple %d", iContextID);
  573.             ReportNdbError(szMsg, err);
  574.             delete[] pchContextData;
  575.             return -1;
  576.         }
  577.         iPrevLockTime = iLockTime;
  578.         iPrevLockTimeUSec = iLockTimeUSec;
  579.         InterlockedIncrement(&g_nNumberOfInitialVerify);
  580.     }
  581.     delete[] pchContextData;
  582.     return iRes;
  583. }
  584. void* RuntimeCallContext(void* lpParam)
  585. {
  586.     long nNumCallsProcessed = 0;
  587.     int nStartingRecordID = *(int*)lpParam;
  588.     
  589.     Ndb* pNdb;
  590.     char* pchContextData = new char[g_nStatusDataSize];
  591.     char szMsg[100];
  592.     
  593.     int iRes;
  594.     const char* szOp;
  595.     long iVersion;
  596.     long iLockFlag;
  597.     long iLockTime;
  598.     long iLockTimeUSec;
  599.     
  600.     pNdb = new Ndb("TEST_DB");
  601.     if(!pNdb)
  602.     {
  603.         NdbMutex_Lock(g_pNdbMutexPrintf);
  604.         printf("new Ndb failedn");
  605.         NdbMutex_Unlock(g_pNdbMutexPrintf);
  606.         delete[] pchContextData;
  607.         return 0;
  608.     }
  609.     
  610.     if(pNdb->init(1) || pNdb->waitUntilReady())
  611.     {
  612.         ReportNdbError("init of Ndb failed", pNdb->getNdbError());
  613.         delete pNdb;
  614.         delete[] pchContextData;
  615.         return 0;
  616.     }
  617.     if(g_bInsertInitial)
  618.     {
  619.         if(InsertInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
  620.         {
  621.             delete pNdb;
  622.             delete[] pchContextData;
  623.             return 0;
  624.         }
  625.     }
  626.     if(g_bVerifyInitial)
  627.     {
  628.         NdbError err;
  629.         memset(&err, 0, sizeof(err));
  630.         if(VerifyInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
  631.         {
  632.             delete pNdb;
  633.             delete[] pchContextData;
  634.             return 0;
  635.         }
  636.     }
  637.     if(g_bInsertInitial || g_bVerifyInitial)
  638.     {
  639.         delete[] pchContextData;
  640.         return 0;
  641.     }
  642.     long nContextID = nStartingRecordID;
  643. #ifdef NDB_WIN32
  644.     while(WaitForSingleObject(hShutdownEvent,0) != WAIT_OBJECT_0)
  645. #else
  646.     while(!bShutdownEvent)
  647. #endif
  648.     {
  649.         ++nContextID;
  650.         nContextID %= g_nMaxContextIdPerThread;
  651.         nContextID += nStartingRecordID;
  652.         bool bTimeLatency = (nContextID==100);
  653.         
  654.         NDB_TICKS tStartCall = NdbTick_CurrentMillisecond();
  655.         for (int i=0; i < 20; i++)
  656.         {
  657.             int nRetry = 0;
  658.             NdbError err;
  659.             memset(&err, 0, sizeof(err));
  660.             NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond();
  661.             switch(i)
  662.             {
  663.             case 3:
  664.             case 6:
  665.             case 9: 
  666.             case 11: 
  667.             case 12: 
  668.             case 15: 
  669.             case 18:   // Query Record
  670.                 szOp = "Read";
  671.                 iRes = RetryQueryTransaction(pNdb, nContextID, &iVersion, &iLockFlag, 
  672.                     &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
  673.                 break;
  674.                 
  675.             case 19:    // Delete Record
  676.                 szOp = "Delete";
  677.                 iRes = RetryDeleteTransaction(pNdb, nContextID, err, nRetry);
  678.                 break;
  679.                 
  680.             case 0: // Insert Record
  681.                 szOp = "Insert";
  682.                 iRes = RetryInsertTransaction(pNdb, nContextID, 1, 1, 1, 1, STATUS_DATA, err, nRetry);
  683.                 break;
  684.                 
  685.             default:    // Update Record
  686.                 szOp = "Update";
  687.                 iRes = RetryUpdateTransaction(pNdb, nContextID, err, nRetry);
  688.                 break;
  689.             }
  690.             NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond();
  691.             long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
  692.             if(g_bReport)
  693.             {
  694.               assert(lMillisecForThisTrans>=0 && lMillisecForThisTrans<c_nMaxMillisecForAllTrans);
  695.               InterlockedIncrement(g_plCountMillisecForTrans+lMillisecForThisTrans);
  696.             }
  697.             if(nRetry>0)
  698.             {
  699.                 sprintf(szMsg, "%s retried %d times, time %ld msec.", 
  700.                     szOp, nRetry, lMillisecForThisTrans);
  701.                 ReportNdbError(szMsg, err);
  702.             }
  703.             else if(bTimeLatency)
  704.             {
  705.                 NdbMutex_Lock(g_pNdbMutexPrintf);
  706.                 printf("%s = %ld msec.n", szOp, lMillisecForThisTrans);
  707.                 NdbMutex_Unlock(g_pNdbMutexPrintf);
  708.             }
  709.             if(iRes)
  710.             {
  711.                 sprintf(szMsg, "%s failed after %ld calls, terminating thread", 
  712.                     szOp, nNumCallsProcessed);
  713.                 ReportNdbError(szMsg, err);
  714.                 delete pNdb;
  715.                 delete[] pchContextData;
  716.                 return 0;
  717.             }
  718.         }
  719.         NDB_TICKS tEndCall = NdbTick_CurrentMillisecond();
  720.         long lMillisecForThisCall = (long)(tEndCall-tStartCall);
  721.         if(g_bReport)
  722.         {
  723.           assert(lMillisecForThisCall>=0 && lMillisecForThisCall<c_nMaxMillisecForAllCall);
  724.           InterlockedIncrement(g_plCountMillisecForCall+lMillisecForThisCall);
  725.         }
  726.         if(bTimeLatency)
  727.         {
  728.             NdbMutex_Lock(g_pNdbMutexPrintf);
  729.             printf("Total time for call is %ld msec.n", (long)lMillisecForThisCall);
  730.             NdbMutex_Unlock(g_pNdbMutexPrintf);
  731.         }
  732.         
  733.         nNumCallsProcessed++;
  734.         InterlockedIncrementAndReport();
  735.         if(g_nMaxCallsPerSecond>0)
  736.         {
  737.             int iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
  738.             iMillisecToSleep -= lMillisecForThisCall;
  739.             if(iMillisecToSleep>0)
  740.             {
  741.                 NdbSleep_MilliSleep(iMillisecToSleep);
  742.             }
  743.         }
  744.     }
  745.     NdbMutex_Lock(g_pNdbMutexPrintf);
  746.     printf("Terminating thread after %ld callsn", nNumCallsProcessed);
  747.     NdbMutex_Unlock(g_pNdbMutexPrintf);
  748.     
  749.     delete pNdb;
  750.     delete[] pchContextData;
  751.     return 0;
  752. }
  753. int CreateCallContextTable(Ndb* pNdb, const char* szTableName, bool bStored)
  754. {
  755.     int iRes = -1;
  756.     NdbError err;
  757.     memset(&err, 0, sizeof(err));
  758.     NdbSchemaCon* pNdbSchemaCon = NdbSchemaCon::startSchemaTrans(pNdb);
  759.     if(pNdbSchemaCon)
  760.     {
  761.         NdbSchemaOp* pNdbSchemaOp = pNdbSchemaCon->getNdbSchemaOp();
  762.         if(pNdbSchemaOp)
  763.         {
  764.             if(!pNdbSchemaOp->createTable(szTableName, 8, TupleKey, 2, 
  765.                 All, 6, 78, 80, 1, bStored)
  766.                 && !pNdbSchemaOp->createAttribute(c_szContextId, TupleKey, 32, 1, Signed)
  767.                 && !pNdbSchemaOp->createAttribute(c_szVersion, NoKey, 32, 1, Signed)
  768.                 && !pNdbSchemaOp->createAttribute(c_szLockFlag, NoKey, 32, 1, Signed)
  769.                 && !pNdbSchemaOp->createAttribute(c_szLockTime, NoKey, 32, 1, Signed)
  770.                 && !pNdbSchemaOp->createAttribute(c_szLockTimeUSec, NoKey, 32, 1, Signed)
  771.                 && !pNdbSchemaOp->createAttribute(c_szContextData, NoKey, 8, g_nStatusDataSize, String)) 
  772.             {
  773.                 if(!pNdbSchemaCon->execute()) 
  774.                     iRes = 0;
  775.                 else 
  776.                     err = pNdbSchemaCon->getNdbError();
  777.             } 
  778.             else 
  779.                 err = pNdbSchemaOp->getNdbError();
  780.         } 
  781.         else 
  782.             err = pNdbSchemaCon->getNdbError();
  783.         NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon);
  784.     }
  785.     else 
  786.         err = pNdb->getNdbError();
  787.     if(iRes)
  788.     {
  789.         ReportNdbError("create call context table failed", err);
  790.     }
  791.     return iRes;
  792. }
  793. void ReportResponseTimeStatistics(const char* szStat, long* plCount, const long lSize)
  794. {
  795.   long lCount = 0;
  796.   Int64 llSum = 0;
  797.   Int64 llSum2 = 0;
  798.   long lMin = -1;
  799.   long lMax = -1;
  800.   for(long l=0; l<lSize; ++l)
  801.   {
  802.     if(plCount[l]>0)
  803.     {
  804.       lCount += plCount[l];
  805.       llSum += (Int64)l*(Int64)plCount[l];
  806.       llSum2 += (Int64)l*(Int64)l*(Int64)plCount[l];
  807.       if(lMin==-1 || l<lMin)
  808.       {
  809.         lMin = l;
  810.       }
  811.       if(lMax==-1 || l>lMax)
  812.       {
  813.         lMax = l;
  814.       }
  815.     }
  816.   }
  817.   long lAvg = long(llSum/lCount);
  818.   double dblVar = ((double)lCount*(double)llSum2 - (double)llSum*(double)llSum)/((double)lCount*(double)(lCount-1));
  819.   long lStd = long(sqrt(dblVar));
  820.   long lMed = -1;
  821.   long l95 = -1;
  822.   long lSel = -1;
  823.   for(long l=lMin; l<=lMax; ++l)
  824.   {
  825.     if(plCount[l]>0)
  826.     {
  827.       lSel += plCount[l];
  828.       if(lMed==-1 && lSel>=(lCount/2))
  829.       {
  830.         lMed = l;
  831.       }
  832.       if(l95==-1 && lSel>=((lCount*95)/100))
  833.       {
  834.         l95 = l;
  835.       }
  836.       if(g_bReportPlus)
  837.       {
  838.         printf("%ldt%ldn", l, plCount[l]);
  839.       }
  840.     }
  841.   }
  842.   printf("%s: Count=%ld, Min=%ld, Max=%ld, Avg=%ld, Std=%ld, Med=%ld, 95%%=%ldn",
  843.     szStat, lCount, lMin, lMax, lAvg, lStd, lMed, l95);
  844. }
  845. void ShowHelp(const char* szCmd)
  846. {
  847.     printf("%s -t<threads> [-s<seed>] [-b<batch>] [-c<maxcps>] [-m<size>] [-d] [-i] [-v] [-f] [-w] [-r[+]]n", szCmd);
  848.     printf("%s -?n", szCmd);
  849.     puts("-dttcreate the table");
  850.     puts("-ittinsert initial records");
  851.     puts("-vttverify initial records");
  852.     puts("-t<threads>tnumber of threads making calls");
  853.     puts("-s<seed>toffset for primary key");
  854.     puts("-b<batch>tbatch size per thread");
  855.     puts("-c<maxcps>tmax number of calls per second for this process");
  856.     puts("-m<size>tsize of context data");
  857.     puts("-fttno checkpointing and no logging");
  858.     puts("-wttuse writeTuple instead of insertTuple");
  859.     puts("-rttreport response time statistics");
  860.     puts("-r+ttreport response time distribution");
  861.     puts("-?tthelp");
  862. }
  863. int main(int argc, char* argv[])
  864. {
  865.     ndb_init();
  866.     int iRes = -1;
  867.     g_nNumThreads = 0;
  868.     g_nMaxCallsPerSecond = 0;
  869.     long nSeed = 0;
  870.     bool bStoredTable = true;
  871.     bool bCreateTable = false;
  872.     g_bWriteTuple = false;
  873.     g_bReport = false;
  874.     g_bReportPlus = false;
  875.     
  876.     for(int i=1; i<argc; ++i)
  877.     {
  878.         if(argv[i][0]=='-' || argv[i][0]=='/')
  879.         {
  880.             switch(argv[i][1])
  881.             {
  882.             case 't': 
  883.                 g_nNumThreads = atol(argv[i]+2); 
  884.                 break;
  885.             case 's': 
  886.                 nSeed = atol(argv[i]+2); 
  887.                 break;
  888.             case 'b': 
  889.                 g_nMaxContextIdPerThread = atol(argv[i]+2); 
  890.                 break;
  891.             case 'm': 
  892.                 g_nStatusDataSize = atol(argv[i]+2); 
  893.                 if(g_nStatusDataSize>sizeof(STATUS_DATA))
  894.                 {
  895.                     g_nStatusDataSize = sizeof(STATUS_DATA);
  896.                 }
  897.                 break;
  898.             case 'i': 
  899.                 g_bInsertInitial = true;
  900.                 break;
  901.             case 'v': 
  902.                 g_bVerifyInitial = true;
  903.                 break;
  904.             case 'd':
  905.                 bCreateTable = true;
  906.                 break;
  907.             case 'f': 
  908.                 bStoredTable = false;
  909.                 break;
  910.             case 'w': 
  911.                 g_bWriteTuple = true;
  912.                 break;
  913.             case 'r': 
  914.                 g_bReport = true;
  915.                 if(argv[i][2]=='+')
  916.                 {
  917.                   g_bReportPlus = true;
  918.                 }
  919.                 break;
  920.             case 'c':
  921.                 g_nMaxCallsPerSecond = atol(argv[i]+2);
  922.                 break;
  923.             case '?':
  924.             default:
  925.                 ShowHelp(argv[0]);
  926.                 return -1;
  927.             }
  928.         }
  929.         else
  930.         {
  931.             ShowHelp(argv[0]);
  932.             return -1;
  933.         }
  934.     }
  935.     if(bCreateTable)
  936.         puts("-dtcreate the table");
  937.     if(g_bInsertInitial)
  938.         printf("-itinsert initial recordsn");
  939.     if(g_bVerifyInitial)
  940.         printf("-vtverify initial recordsn");
  941.     if(g_nNumThreads>0)
  942.         printf("-t%ldtnumber of threads making callsn", g_nNumThreads);
  943.     if(g_nNumThreads>0)
  944.     {
  945.         printf("-s%ldtoffset for primary keyn", nSeed);
  946.         printf("-b%ldtbatch size per threadn", g_nMaxContextIdPerThread);
  947.     }
  948.     if(g_nMaxCallsPerSecond>0)
  949.         printf("-c%ldtmax number of calls per second for this processn", g_nMaxCallsPerSecond);
  950.     if(!bStoredTable)
  951.         puts("-ftno checkpointing and no logging to disk");
  952.     if(g_bWriteTuple)
  953.         puts("-wtuse writeTuple instead of insertTuple");
  954.     if(g_bReport)
  955.         puts("-rtreport response time statistics");
  956.     if(g_bReportPlus)
  957.         puts("-r+treport response time distribution");
  958.     if(!bCreateTable && g_nNumThreads<=0)
  959.     {
  960.         ShowHelp(argv[0]);
  961.         return -1;
  962.     }
  963.     printf("-m%ldtsize of context datan", g_nStatusDataSize);
  964.     g_szTableName = (bStoredTable ? c_szTableNameStored : c_szTableNameTemp);
  965.     
  966. #ifdef NDB_WIN32
  967.     SetConsoleCtrlHandler(ConsoleCtrlHandler, true); 
  968. #else
  969.     signal(SIGINT, CtrlCHandler);
  970. #endif
  971.     if(g_bReport)
  972.     {
  973.       g_plCountMillisecForCall = new long[c_nMaxMillisecForAllCall];
  974.       memset(g_plCountMillisecForCall, 0, c_nMaxMillisecForAllCall*sizeof(long));
  975.       g_plCountMillisecForTrans = new long[c_nMaxMillisecForAllTrans];
  976.       memset(g_plCountMillisecForTrans, 0, c_nMaxMillisecForAllTrans*sizeof(long));
  977.     }
  978.     
  979.     g_pNdbMutexIncrement = NdbMutex_Create();
  980.     g_pNdbMutexPrintf = NdbMutex_Create();
  981. #ifdef NDB_WIN32
  982.     hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
  983. #endif
  984.     
  985.     Ndb* pNdb = new Ndb(c_szDatabaseName);
  986.     if(!pNdb)
  987.     {
  988.         printf("could not construct ndbn");
  989.         return 1;
  990.     }
  991.     
  992.     if(pNdb->init(1) || pNdb->waitUntilReady())
  993.     {
  994.         ReportNdbError("could not initialize ndbn", pNdb->getNdbError());
  995.         delete pNdb;
  996.         return 2;
  997.     }
  998.     if(bCreateTable)
  999.     {
  1000.         printf("Create CallContext tablen");
  1001. if (bStoredTable)
  1002. {
  1003.   if (CreateCallContextTable(pNdb, c_szTableNameStored, true))
  1004.   {
  1005.             printf("Create table failedn");
  1006.             delete pNdb;
  1007.             return 3;     
  1008.   }
  1009. }
  1010. else
  1011. {
  1012.   if (CreateCallContextTable(pNdb, c_szTableNameTemp, false))
  1013.   {
  1014.             printf("Create table failedn");
  1015.             delete pNdb;
  1016.             return 3;
  1017.   }
  1018. }
  1019.     }
  1020.     
  1021.     if(g_nNumThreads>0) 
  1022.     {
  1023.         printf("creating %d threadsn", (int)g_nNumThreads);
  1024.         if(g_bInsertInitial)
  1025.         {
  1026.             printf("each thread will insert %ld initial records, total %ld insertsn", 
  1027.                 g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
  1028.         }
  1029.         if(g_bVerifyInitial)
  1030.         {
  1031.             printf("each thread will verify %ld initial records, total %ld readsn", 
  1032.                 g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
  1033.         }
  1034.         g_nNumberOfInitialInsert = 0;
  1035.         g_nNumberOfInitialVerify = 0;
  1036.         NDB_TICKS tStartTime = NdbTick_CurrentMillisecond();
  1037.         NdbThread* pThreads[256];
  1038.         int pnStartingRecordNum[256];
  1039.         int ij;
  1040.         for(ij=0;ij<g_nNumThreads;ij++) 
  1041.         {
  1042.             pnStartingRecordNum[ij] = (ij*g_nMaxContextIdPerThread) + nSeed;
  1043.         }
  1044.         
  1045.         for(ij=0;ij<g_nNumThreads;ij++) 
  1046.         {
  1047.             pThreads[ij] = NdbThread_Create(RuntimeCallContext, 
  1048.                 (void**)(pnStartingRecordNum+ij), 
  1049.                 0, "RuntimeCallContext", NDB_THREAD_PRIO_LOW);
  1050.         }
  1051.         
  1052.         //Wait for the threads to finish
  1053.         for(ij=0;ij<g_nNumThreads;ij++) 
  1054.         {
  1055.             void* status;
  1056.             NdbThread_WaitFor(pThreads[ij], &status);
  1057.         }
  1058.         NDB_TICKS tEndTime = NdbTick_CurrentMillisecond();
  1059.         
  1060.         //Print time taken
  1061.         printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)n",
  1062.             g_nNumCallsProcessed, 
  1063.             (long)(tEndTime-tStartTime), 
  1064.             (long)((1000*g_nNumCallsProcessed)/(tEndTime-tStartTime)));
  1065.         if(g_bInsertInitial)
  1066.             printf("successfully inserted %ld tuplesn", g_nNumberOfInitialInsert);
  1067.         if(g_bVerifyInitial)
  1068.             printf("successfully verified %ld tuplesn", g_nNumberOfInitialVerify);
  1069.     }
  1070.     
  1071.     delete pNdb;
  1072. #ifdef NDB_WIN32
  1073.     CloseHandle(hShutdownEvent);
  1074. #endif
  1075.     NdbMutex_Destroy(g_pNdbMutexIncrement);
  1076.     NdbMutex_Destroy(g_pNdbMutexPrintf);
  1077.     if(g_bReport)
  1078.     {
  1079.       ReportResponseTimeStatistics("Calls", g_plCountMillisecForCall, c_nMaxMillisecForAllCall);
  1080.       ReportResponseTimeStatistics("Transactions", g_plCountMillisecForTrans, c_nMaxMillisecForAllTrans);
  1081.       delete[] g_plCountMillisecForCall;
  1082.       delete[] g_plCountMillisecForTrans;
  1083.     }
  1084.     return 0;
  1085. }