testLcp.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:8k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. #include <NDBT.hpp>
  2. #include <NdbApi.hpp>
  3. #include <NdbRestarter.hpp>
  4. #include <HugoOperations.hpp>
  5. #include <UtilTransactions.hpp>
  6. #include <signaldata/DumpStateOrd.hpp>
  7. struct CASE 
  8. {
  9.   bool start_row;
  10.   bool end_row;
  11.   bool curr_row;
  12.   const char * op1;
  13.   const char * op2;
  14.   int val;
  15. };
  16. static CASE g_ops[] =
  17. {
  18.   { false, true,  false, "INSERT", 0,        0 },
  19.   { false, true,  false, "INSERT", "UPDATE", 0 },
  20.   { false, false, false, "INSERT", "DELETE", 0 },
  21.   { true,  true,  false, "UPDATE", 0,        0 },
  22.   { true,  true,  false, "UPDATE", "UPDATE", 0 },
  23.   { true,  false, false, "UPDATE", "DELETE", 0 },
  24.   { true,  false, false, "DELETE", 0,        0 },
  25.   { true,  true,  false, "DELETE", "INSERT", 0 }
  26. };
  27. const size_t OP_COUNT = (sizeof(g_ops)/sizeof(g_ops[0]));
  28. static Ndb* g_ndb = 0;
  29. static CASE* g_cases;
  30. static HugoOperations* g_hugo_ops;
  31. static int g_rows = 1000;
  32. static int g_setup_tables = 1;
  33. static const char * g_tablename = "T1";
  34. static const NdbDictionary::Table* g_table = 0;
  35. static NdbRestarter g_restarter;
  36. static int init_ndb(int argc, char** argv);
  37. static int parse_args(int argc, char** argv);
  38. static int connect_ndb();
  39. static int drop_all_tables();
  40. static int load_table();
  41. static int pause_lcp();
  42. static int do_op(int row);
  43. static int continue_lcp(int error);
  44. static int commit();
  45. static int restart();
  46. static int validate();
  47. #define require(x) { bool b = x; if(!b){g_err << __LINE__ << endl; abort();}}
  48. int 
  49. main(int argc, char ** argv){
  50.   require(!init_ndb(argc, argv));
  51.   require(!parse_args(argc, argv));
  52.   require(!connect_ndb());
  53.   
  54.   if(g_setup_tables){
  55.     require(!drop_all_tables());
  56.     
  57.     if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
  58.       exit(-1);
  59.     }
  60.   }
  61.   
  62.   g_table = g_ndb->getDictionary()->getTable(g_tablename);
  63.   if(g_table == 0){
  64.     g_err << "Failed to retreive table: " << g_tablename << endl;
  65.     exit(-1);
  66.   }
  67.   require(g_hugo_ops = new HugoOperations(* g_table));
  68.   require(!g_hugo_ops->startTransaction(g_ndb));
  69.   
  70.   g_cases= new CASE[g_rows];
  71.   require(!load_table());
  72.   
  73.   g_info << "Performing all ops wo/ inteference of LCP" << endl;
  74.   
  75.   g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
  76.   g_info << "  where ZLCP_OP_WRITE_RT_BREAK is finished before SAVE_PAGES"
  77.  << endl;
  78.   require(!pause_lcp());
  79.   size_t j;
  80.   for(j = 0; j<g_rows; j++){
  81.     require(!do_op(j));
  82.   }
  83.   require(!continue_lcp(5900));
  84.   require(!commit());
  85.   require(!restart());
  86.   require(!validate());
  87.   
  88.   g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
  89.   g_info << "  where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
  90.  << endl;
  91.   require(!load_table());
  92.   require(!pause_lcp());
  93.   for(j = 0; j<g_rows; j++){
  94.     require(!do_op(j));
  95.   }
  96.   require(!continue_lcp(5901));
  97.   require(!commit());
  98.   require(!restart());
  99.   require(!validate());
  100.   
  101.   g_info << "Testing pre LCP operations, undo-ed at commit" << endl;
  102.   require(!load_table());
  103.   require(!pause_lcp());
  104.   for(j = 0; j<g_rows; j++){
  105.     require(!do_op(j));
  106.   }
  107.   require(!continue_lcp(5902));
  108.   require(!commit());
  109.   require(!continue_lcp(5903));
  110.   require(!restart());
  111.   require(!validate());
  112. }
  113. static int init_ndb(int argc, char** argv)
  114. {
  115.   return 0;
  116. }
  117. static int parse_args(int argc, char** argv)
  118. {
  119.   return 0;
  120. }
  121. static int connect_ndb()
  122. {
  123.   g_ndb = new Ndb("TEST_DB");
  124.   g_ndb->init();
  125.   if(g_ndb->waitUntilReady(30) == 0){
  126.     int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
  127.     return g_restarter.dumpStateAllNodes(args, 1);
  128.   }
  129.   return -1;
  130. }
  131. static int disconnect_ndb()
  132. {
  133.   delete g_ndb;
  134.   g_ndb = 0;
  135.   g_table = 0;
  136.   return 0;
  137. }
  138. static int drop_all_tables()
  139. {
  140.   NdbDictionary::Dictionary * dict = g_ndb->getDictionary();
  141.   require(dict);
  142.   BaseString db = g_ndb->getDatabaseName();
  143.   BaseString schema = g_ndb->getSchemaName();
  144.   NdbDictionary::Dictionary::List list;
  145.   if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){
  146.       g_err << "Failed to list tables: " << endl
  147.     << dict->getNdbError() << endl;
  148.       return -1;
  149.   }
  150.   for (unsigned i = 0; i < list.count; i++) {
  151.     NdbDictionary::Dictionary::List::Element& elt = list.elements[i];
  152.     switch (elt.type) {
  153.     case NdbDictionary::Object::SystemTable:
  154.     case NdbDictionary::Object::UserTable:
  155.       g_ndb->setDatabaseName(elt.database);
  156.       g_ndb->setSchemaName(elt.schema);
  157.       if(dict->dropTable(elt.name) != 0){
  158. g_err << "Failed to drop table: " 
  159.       << elt.database << "/" << elt.schema << "/" << elt.name <<endl;
  160. g_err << dict->getNdbError() << endl;
  161. return -1;
  162.       }
  163.       break;
  164.     case NdbDictionary::Object::UniqueHashIndex:
  165.     case NdbDictionary::Object::OrderedIndex:
  166.     case NdbDictionary::Object::HashIndexTrigger:
  167.     case NdbDictionary::Object::IndexTrigger:
  168.     case NdbDictionary::Object::SubscriptionTrigger:
  169.     case NdbDictionary::Object::ReadOnlyConstraint:
  170.     default:
  171.       break;
  172.     }
  173.   }
  174.   
  175.   g_ndb->setDatabaseName(db.c_str());
  176.   g_ndb->setSchemaName(schema.c_str());
  177.   
  178.   return 0;
  179. }
  180. static int load_table()
  181. {
  182.   UtilTransactions clear(* g_table);
  183.   require(!clear.clearTable(g_ndb));
  184.   
  185.   HugoOperations ops(* g_table);
  186.   require(!ops.startTransaction(g_ndb));
  187.   for(size_t i = 0; i<g_rows; i++){
  188.     g_cases[i] = g_ops[ i % OP_COUNT];
  189.     if(g_cases[i].start_row){
  190.       g_cases[i].curr_row = true;
  191.       g_cases[i].val = rand();
  192.       require(!ops.pkInsertRecord(g_ndb, i, 1, g_cases[i].val));
  193.     }
  194.     if((i+1) % 100 == 0){
  195.       require(!ops.execute_Commit(g_ndb));
  196.       require(!ops.getTransaction()->restart());
  197.     }
  198.   }
  199.   if((g_rows+1) % 100 != 0)
  200.     require(!ops.execute_Commit(g_ndb));
  201.   return 0;
  202. }
  203. static int pause_lcp()
  204. {
  205.   return 0;
  206. }
  207. static int do_op(int row)
  208. {
  209.   HugoOperations & ops = * g_hugo_ops;
  210.   if(strcmp(g_cases[row].op1, "INSERT") == 0){
  211.     require(!g_cases[row].curr_row);
  212.     g_cases[row].curr_row = true;
  213.     g_cases[row].val = rand();
  214.     require(!ops.pkInsertRecord(g_ndb, row, 1, g_cases[row].val));
  215.   } else if(strcmp(g_cases[row].op1, "UPDATE") == 0){
  216.     require(g_cases[row].curr_row);
  217.     g_cases[row].val = rand();
  218.     require(!ops.pkUpdateRecord(g_ndb, row, 1, g_cases[row].val));
  219.   } else if(strcmp(g_cases[row].op1, "DELETE") == 0){
  220.     require(g_cases[row].curr_row);    
  221.     g_cases[row].curr_row = false;
  222.     require(!ops.pkDeleteRecord(g_ndb, row, 1));
  223.   }
  224.   require(!ops.execute_NoCommit(g_ndb));
  225.   
  226.   if(g_cases[row].op2 == 0){
  227.   } else if(strcmp(g_cases[row].op2, "INSERT") == 0){
  228.     require(!g_cases[row].curr_row);
  229.     g_cases[row].curr_row = true;
  230.     g_cases[row].val = rand();
  231.     require(!ops.pkInsertRecord(g_ndb, row, 1, g_cases[row].val));
  232.   } else if(strcmp(g_cases[row].op2, "UPDATE") == 0){
  233.     require(g_cases[row].curr_row);
  234.     g_cases[row].val = rand();
  235.     require(!ops.pkUpdateRecord(g_ndb, row, 1, g_cases[row].val));
  236.   } else if(strcmp(g_cases[row].op2, "DELETE") == 0){
  237.     require(g_cases[row].curr_row);    
  238.     g_cases[row].curr_row = false;    
  239.     require(!ops.pkDeleteRecord(g_ndb, row, 1));
  240.   }
  241.   
  242.   if(g_cases[row].op2 != 0)
  243.     require(!ops.execute_NoCommit(g_ndb));  
  244.   return 0;
  245. }
  246. static int continue_lcp(int error)
  247. {
  248.   error = 0;
  249.   if(g_restarter.insertErrorInAllNodes(error) == 0){
  250.     int args[] = { DumpStateOrd::DihStartLcpImmediately };
  251.     return g_restarter.dumpStateAllNodes(args, 1);
  252.   }
  253.   return -1;
  254. }
  255. static int commit()
  256. {
  257.   HugoOperations & ops = * g_hugo_ops;  
  258.   int res = ops.execute_Commit(g_ndb);
  259.   if(res == 0){
  260.     return ops.getTransaction()->restart();
  261.   }
  262.   return res;
  263. }
  264. static int restart()
  265. {
  266.   g_info << "Restarting cluster" << endl;
  267.   disconnect_ndb();
  268.   delete g_hugo_ops;
  269.   
  270.   require(!g_restarter.restartAll());
  271.   require(!g_restarter.waitClusterStarted(30));
  272.   require(!connect_ndb());
  273.   
  274.   g_table = g_ndb->getDictionary()->getTable(g_tablename);
  275.   require(g_table);
  276.   require(g_hugo_ops = new HugoOperations(* g_table));
  277.   require(!g_hugo_ops->startTransaction(g_ndb));
  278.   return 0;
  279. }
  280. static int validate()
  281. {
  282.   HugoOperations ops(* g_table);
  283.   for(size_t i = 0; i<g_rows; i++){
  284.     require(g_cases[i].curr_row == g_cases[i].end_row);
  285.     require(!ops.startTransaction(g_ndb));
  286.     ops.pkReadRecord(g_ndb, i, 1);
  287.     int res = ops.execute_Commit(g_ndb);
  288.     if(g_cases[i].curr_row){
  289.       require(res == 0 && ops.verifyUpdatesValue(g_cases[i].val) == 0);
  290.     } else {
  291.       require(res == 626);
  292.     }
  293.     ops.closeTransaction(g_ndb);
  294.   }
  295.   return 0;
  296. }