cutil.c
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:14k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "tx/request.h"
  19. static product_t *prods;
  20. static char testid[16];
  21. static char *emps[] = {"8000", "8001", "8002", "8003", "8004", "8005", "8006", "8007"};
  22. static void set_test_id(const char *id) {
  23. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  24. (void) strncpy(testid, id, sizeof (testid));
  25. }
  26. static int send_req(test_req_t *req, char **prbuf) {
  27. long rsz = sizeof (test_req_t);
  28. long callflags = 0L; /*TPNOTIME;*/
  29. test_req_t *resp;
  30. int rv = 0;
  31. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  32. resp = (test_req_t *) tpalloc((char*) "X_C_TYPE", (char*) "test_req", 0);
  33. userlogc_debug( "TxLog Invoke Service %s %4d: prod=%d op=%c data=%s dbf=%s tx=%d",
  34. TXTEST_SVC_NAME, req->id, req->prod, req->op, req->data, req->db, req->txtype);
  35. if (tpcall((char *) TXTEST_SVC_NAME, (char *) req, sizeof (test_req_t), (char **) &resp, &rsz, callflags) == -1) {
  36. userlogc_debug( "TxLog TP ERROR tperrno: %d", tperrno);
  37. rv = -1;
  38. } else if (prbuf && *prbuf) {
  39. strncpy(*prbuf, resp->data, sizeof (resp->data));
  40. }
  41. userlogc_debug( "TxLog %s:%d tpcall res=%d status=%d", __FUNCTION__, __LINE__,
  42. rv, (resp ? resp->status : -2));
  43. tpfree((char *) resp);
  44. return rv;
  45. }
  46. static int count_records(const char *msg, char *key, int in_tx, int expect) {
  47. int cnt = -1;
  48. userlogc_debug( "TxLog %s:%d msg=%s key=%s in_tx=%d expect=%d", __FUNCTION__, __LINE__, msg, key, in_tx, expect);
  49. if (in_tx || start_tx(TX_TYPE_BEGIN) == TX_OK) {
  50. int rv = 0;
  51. test_req_t *req;
  52. test_req_t res;
  53. product_t *p = prods;
  54. char *rbuf = (char *) (res.data);
  55. for (p = prods; p->id != -1; p++) {
  56. int remote = LOCAL_ACCESS;
  57. userlogc_debug( "TxLog check product %s", p->pname);
  58. if (p->pname == NULL)
  59. continue;
  60. if ((p->loc & remote) == 0) /* the RM does not support the requested access type */
  61. remote = p->loc;
  62. req = get_buf((remote & REMOTE_ACCESS), key, p->dbname, '1', p->id, TX_TYPE_NONE, expect);
  63. userlogc_debug( "TxLog invoke prod %s (id=%d) remote=%d dbf=%s", p->pname, p->id, remote, p->dbname);
  64. userlogc_debug( "TxLog xyz_access op=%c data=%s db=%s", req->op, req->data, req->db);
  65. rv = ((remote & REMOTE_ACCESS) ? send_req(req, &rbuf) : p->access(req, &res));
  66. userlogc_debug( "TxLog invoked ok");
  67. if (rv)
  68. userlogc_warn( "TxLog BAD REQ %d", rv);
  69. free_buf((remote & REMOTE_ACCESS), req);
  70. rv = (rv == 0 ? atoi(res.data) : -1);
  71. userlogc_debug( "TxLog and count is %d", rv);
  72. if (rv == -1) {
  73. userlogc_warn( "TxLog Error: Db %d access error", p->id);
  74. }
  75. userlogc_debug( "TxLog product %s (id=%d) dbf=%s returned %d records", p->pname, p->id, p->dbname, rv);
  76. if (rv != cnt && cnt != -1) {
  77. userlogc_warn( "TxLog All databases should have the same no of records: db %d cnt %d (prev was %d)", p->id, rv, cnt);
  78. rv = -1;
  79. }
  80. if ((cnt = rv) == -1)
  81. break;
  82. }
  83. if (in_tx || end_tx(TX_TYPE_COMMIT) == TX_OK)
  84. return cnt;
  85. }
  86. return cnt;
  87. }
  88. static int check_count(const char *msg, char *key, int in_tx, int expect) {
  89. int rcnt;
  90. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  91. if ((rcnt = count_records(msg, key, 0, expect)) != expect) {
  92. userlogc_warn( "TxLog %s: WRONG NUMBER OF RECORDS: %d expected %d", msg, rcnt, expect);
  93. return -1;
  94. }
  95. userlogc( "TxLog %s: %s: RECORD COUNT: %d expected %d", testid, msg, rcnt, expect);
  96. return 0;
  97. }
  98. static int db_op(const char *msg, const char *data, char op, int txtype,
  99.  char **prbuf, int remote, int migrating, int expect) {
  100. /* userlogc( "TxLog %s: %s %s", testid, ((remote & REMOTE_ACCESS) ? "REMOTE" : "LOCAL"), msg);*/
  101. if (start_tx(txtype) == TX_OK) {
  102. int rv = 0;
  103. test_req_t *req;
  104. test_req_t res;
  105. product_t *p = prods;
  106. for (p = prods; p->id != -1; p++) {
  107. #if 0
  108. if (migrating && (p->xaflags() & TMNOMIGRATE)) {
  109. /* the RM does not support tx migration (see XA spec for explanation */
  110. userlogc_warn( "TxLog Info: RM %d does not support tx migration (switching from remote)", p->id);
  111. remote = !remote;
  112. }
  113. #endif
  114. if ((p->loc & remote) == 0) /* the RM does not support the requested access type */
  115. remote = p->loc;
  116. userlogc( "TxLog %s: prod: %s %s %s", testid, p->pname,
  117. ((remote & REMOTE_ACCESS) ? "REMOTE" : "LOCAL"), msg);
  118. req = get_buf((remote & REMOTE_ACCESS), data, p->dbname, op, p->id, txtype, expect);
  119. userlogc_debug( "TxLog invoke prod %s (id=%d) remote=%d dbf=%s op=%c data=%s db=%s",
  120. p->pname, p->id, remote, p->dbname, req->op, req->data, req->db);
  121. rv = ((remote & REMOTE_ACCESS) ? send_req(req, prbuf) : p->access(req, &res));
  122. if (rv)
  123. userlogc_warn( "TxLog BAD REQ %d db name: %s", rv, p->dbname);
  124. free_buf((remote & REMOTE_ACCESS), req);
  125. }
  126. if (end_tx(txtype) == TX_OK)
  127. return rv;
  128. }
  129. return -1;
  130. }
  131. /**
  132.  * ensure that all the target databases start off in the same state
  133.  */
  134. static int setup() {
  135. int rv;
  136. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  137. /* start off with no records */
  138. if ((rv = db_op("DELETE AT SETUP", emps[0], '3', TX_TYPE_BEGIN_COMMIT, 0, LOCAL_ACCESS, 0, -1)) != 0)
  139. return rv;
  140. if ((rv = check_count("COUNT RECORDS", emps[0], 0, 0)))
  141. return rv;
  142. return 0;
  143. }
  144. /**
  145.  * remove all the records added by the test
  146.  */
  147. static int teardown(int *cnt)
  148. {
  149. int rv;
  150. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  151. /* delete records starting from emps[0], */
  152. if ((rv = db_op("DELETE", emps[0], '3', TX_TYPE_BEGIN_COMMIT, 0, LOCAL_ACCESS, 0, -1)))
  153. return rv;
  154. *cnt = 0;
  155. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  156. return rv;
  157. return 0;
  158. }
  159. /**
  160.  * start a transaction
  161.  * perform a remote insert on each target db
  162.  * perform another remote insert on each target db
  163.  * perform a local insert on each target db
  164.  * commit the transaction
  165.  *
  166.  * Note for databases that do not support tx migration (meaning that performing a
  167.  * remote insert followed by a local insert is not supported) the local operation
  168.  * is switched to a remote one (which ensures that all the updates are done in the
  169.  * same thread).
  170.  */
  171. static int test1(int *cnt)
  172. {
  173. int rv;
  174. userlogc_debug( "TxLog ENTER %s:%d", __FUNCTION__, __LINE__);
  175. set_test_id("Test 1");
  176. /* ask the remote service to insert a record */
  177. if ((rv = db_op("INSERT 1", emps[5], '0', TX_TYPE_BEGIN, 0, REMOTE_ACCESS, 0, -1))) {
  178. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  179. return rv;
  180. }
  181. /* ask the remote service to insert another record in the same transaction */
  182. if ((rv = db_op("INSERT 2", emps[6], '0', TX_TYPE_NONE, 0, REMOTE_ACCESS, 0, -1))) {
  183. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  184. return rv;
  185. }
  186. /* insert a record and end the already running transaction */
  187. if ((rv = db_op("INSERT", emps[7], '0', TX_TYPE_COMMIT, 0, LOCAL_ACCESS, 1, -1))) {
  188. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  189. return rv;
  190. }
  191. *cnt += 3;
  192. /* make sure the record count increases by 3 */
  193. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt))) {
  194. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  195. return -1;
  196. }
  197. userlogc_debug( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  198. return 0;
  199. }
  200. /**
  201.  * start a transaction
  202.  * perform a remote insert on each target db
  203.  * perform another remote insert on each target db
  204.  * perform a third remote insert on each target db
  205.  * commit the transaction
  206.  */
  207. static int test2(int *cnt)
  208. {
  209. int rv;
  210. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  211. set_test_id("Test 2");
  212. /* ask the remote service to insert a record */
  213. if ((rv = db_op("INSERT 1", emps[0], '0', TX_TYPE_BEGIN, 0, REMOTE_ACCESS, 0, -1)))
  214. return rv;
  215. /* ask the remote service to insert another record in the same transaction */
  216. if ((rv = db_op("INSERT 2", emps[1], '0', TX_TYPE_NONE, 0, REMOTE_ACCESS, 0, -1)))
  217. return rv;
  218. /* insert a record and end the already running transaction */
  219. if ((rv = db_op("INSERT 3", emps[2], '0', TX_TYPE_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  220. return rv;
  221. *cnt += 3;
  222. /* make sure the record count increases by 3 */
  223. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  224. return -1;
  225. return 0;
  226. }
  227. /**
  228.  * start a transaction
  229.  * insert a record into each db
  230.  * commit the transaction
  231.  *
  232.  * repeat the test but rollback the transaction instead of commit
  233.  */
  234. static int test3(int *cnt)
  235. {
  236. int rv;
  237. userlogc_debug( "TxLog IN %s:%d", __FUNCTION__, __LINE__);
  238. set_test_id("Test 3");
  239. /* ask the remote service to insert a record */
  240. if ((rv = db_op("INSERT 1", emps[3], '0', TX_TYPE_BEGIN_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  241. return rv;
  242. *cnt += 1;
  243. /* delete records starting from emps[0] but abort it */
  244. if ((rv = db_op("INSERT WITH ABORT", emps[4], '1', TX_TYPE_BEGIN_ABORT, 0, REMOTE_ACCESS, 0, -1)))
  245. return rv;
  246. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  247. return rv;
  248. userlogc_debug( "TxLog OUT %s:%d success", __FUNCTION__, __LINE__);
  249. return 0;
  250. }
  251. /**
  252.  * start tx, do a remote insert on all dbs commit the tx
  253.  * start tx, do a remote update on all dbs commit the tx
  254.  * start tx, do a remote delete on all dbs abort the tx
  255.  * start tx, do a local delete on all dbs abort the tx
  256.  */
  257. static int test4(int *cnt)
  258. {
  259. int rv;
  260. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  261. set_test_id("Test 4");
  262. /* ask the remote service to insert a record */
  263. if ((rv = db_op("INSERT 1", emps[4], '0', TX_TYPE_BEGIN_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  264. return rv;
  265. *cnt += 1;
  266. /* modify one of the records */
  267. if ((rv = db_op("UPDATE", emps[4], '2', TX_TYPE_BEGIN_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  268. return rv;
  269. if ((rv = check_count("test4 COUNT RECORDS", emps[0], 0, *cnt)))
  270. return rv;
  271. /* remote delete records starting from emps[0] with abort*/
  272. if ((rv = db_op("DELETE WITH ABORT", emps[0], '3', TX_TYPE_BEGIN_ABORT, 0, REMOTE_ACCESS, 0, -1)))
  273. return rv;
  274. /* local delete records starting from emps[0] with abort*/
  275. if ((rv = db_op("DELETE WITH ABORT", emps[0], '3', TX_TYPE_BEGIN_ABORT, 0, LOCAL_ACCESS, 0, -1)))
  276. return rv;
  277. if ((rv = check_count("test4 DELETE WITH ABORT: COUNT RECORDS", emps[0], 0, *cnt)))
  278. return rv;
  279. return 0;
  280. }
  281. /**
  282.  * start a transaction
  283.  * insert a record into each db
  284.  * rollback the transaction
  285.  */
  286. static int test5(int *cnt)
  287. {
  288. int rv;
  289. userlogc_debug( "TxLog IN %s:%d", __FUNCTION__, __LINE__);
  290. set_test_id("Test 5");
  291. /* ask the remote service to insert a record */
  292. if ((rv = db_op("INSERT 1", emps[3], '0', TX_TYPE_BEGIN_ABORT, 0, REMOTE_ACCESS, 0, -1)))
  293. return rv;
  294. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  295. return rv;
  296. userlogc_debug( "TxLog OUT %s:%d success", __FUNCTION__, __LINE__);
  297. return 0;
  298. }
  299. /* cause the program to halt during phase 2 of the transaction 2PC protocol */
  300. static int testrc(int *cnt)
  301. {
  302. int rv;
  303. #if TX_RC == 1
  304. int fault_type = TX_TYPE_BEGIN_COMMIT_HALT;
  305. #elif TX_RC == 2
  306. int fault_type = TX_TYPE_BEGIN_PREPARE_HALT;
  307. #else
  308. int fault_type = TX_TYPE_BEGIN_COMMIT;
  309. #endif
  310. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  311. set_test_id("Test 5");
  312. /* ask the remote service to insert a record but to halt during commit */
  313. if ((rv = tx_set_commit_return(TX_COMMIT_COMPLETED))) /* report heuristics */
  314. userlogc_warn( "tx_set_commit_return error: %d" , rv);
  315. if ((rv = db_op("INSERT 1", emps[4], '0', fault_type, 0, REMOTE_ACCESS, 0, -1))) {
  316. userlogc_warn( "tpcall error %d" , rv);
  317. return rv;
  318. }
  319. /*
  320.  * Unfortunately there is no way of knowing the outcome of commiting the transaction.
  321.  * TODO check the circumstances under which heuristics are generated
  322.  * I also need to figure out why killing the server a number of times corrupts the
  323.  * Berkeley DB (which makes recovery kind of useless)
  324.  */
  325. userlogc("TxLog %s: tx status: %d (should be -1 meaning no transaction)", testid, get_tx_status());
  326. (void) tx_set_commit_return(TX_COMMIT_DECISION_LOGGED); /* return when the commit decision is logged */
  327. return 0;
  328. }
  329. #define xONETEST test5
  330. int run_tests(product_t *prod_array)
  331. {
  332. int rv, i, cnt = 0;
  333. struct test {
  334. int (*test)(int *);
  335. } tests[] = {
  336. #if defined(TX_RC)   // test recovery
  337. {testrc},
  338. #elif defined(ONETEST)
  339. {ONETEST},
  340. #elif defined(TX_MCALLS)   // tx extends over multiple tpacalls
  341. {test1}, {test2},
  342. #elif defined(TX_SCALL)  // tx is active for a single tpacall
  343. {test3}, {test4},
  344. #else
  345. {test1}, {test2}, {test3}, {test4},
  346. #endif
  347. {0}
  348. };
  349. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  350. prods = prod_array;
  351. if (tx_open() != TX_OK)
  352. return fatal("TxLog ERROR - Could not open RMs");
  353. set_test_id("setup");
  354. if ((rv = setup()))
  355. return rv;
  356. for (i = 0; tests[i].test != 0; i++) {
  357. if ((rv = tests[i].test(&cnt))) {
  358. userlogc_warn( (char*) "TxLog %s FAILED", testid);
  359. return rv;
  360. }
  361. userlogc( (char*) "TxLog %s PASSED", testid);
  362. }
  363. userlogc( (char*) "TxLog Tests complete");
  364. set_test_id("teardown");
  365. if ((rv = teardown(&cnt)))
  366. return rv;
  367. if (tx_close() != TX_OK) {
  368. userlogc_warn( (char*) "TxLog ERROR - Could not close transaction: ");
  369. return fatal("ERROR - Could not close RMs");
  370. }
  371. return 0;
  372. }