cutil.c
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:14k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "tx/request.h"
  19. static product_t *prods;
  20. static char testid[16];
  21. static char *emps[] = {"8000", "8001", "8002", "8003", "8004", "8005", "8006", "8007"};
  22. static void set_test_id(const char *id) {
  23. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  24. (void) strncpy(testid, id, sizeof (testid));
  25. }
  26. static int send_req(test_req_t *req, char **prbuf) {
  27. long rsz = sizeof (test_req_t);
  28. long callflags = 0L;
  29. test_req_t *resp;
  30. int rv = 0;
  31. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  32. resp = (test_req_t *) tpalloc((char*) "X_C_TYPE", (char*) "test_req", 0);
  33. userlogc_debug( "TxLog Invoke Service %s %4d: prod=%d op=%c data=%s dbf=%s tx=%d",
  34. TXTEST_SVC_NAME, req->id, req->prod, req->op, req->data, req->db, req->txtype);
  35. if (tpcall((char *) TXTEST_SVC_NAME, (char *) req, sizeof (test_req_t), (char **) &resp, &rsz, callflags) == -1) {
  36. userlogc_debug( "TxLog TP ERROR tperrno: %d", tperrno);
  37. rv = -1;
  38. } else if (prbuf && *prbuf) {
  39. strncpy(*prbuf, resp->data, sizeof (resp->data));
  40. }
  41. userlogc_debug( "TxLog %s:%d tpcall res=%d status=%d", __FUNCTION__, __LINE__, rv, resp->status);
  42. tpfree((char *) resp);
  43. return rv;
  44. }
  45. static int count_records(const char *msg, char *key, int in_tx, int expect) {
  46. int cnt = -1;
  47. userlogc_debug( "TxLog %s:%d msg=%s key=%s in_tx=%d expect=%d", __FUNCTION__, __LINE__, msg, key, in_tx, expect);
  48. if (in_tx || start_tx(TX_TYPE_BEGIN) == TX_OK) {
  49. int rv = 0;
  50. test_req_t *req;
  51. test_req_t res;
  52. product_t *p = prods;
  53. char *rbuf = (char *) (res.data);
  54. for (p = prods; p->id != -1; p++) {
  55. int remote = LOCAL_ACCESS;
  56. userlogc_debug( "TxLog check product %s", p->pname);
  57. if (p->pname == NULL)
  58. continue;
  59. if ((p->loc & remote) == 0) /* the RM does not support the requested access type */
  60. remote = p->loc;
  61. req = get_buf((remote & REMOTE_ACCESS), key, p->dbname, '1', p->id, TX_TYPE_NONE, expect);
  62. userlogc_debug( "TxLog invoke prod %s (id=%d) remote=%d dbf=%s", p->pname, p->id, remote, p->dbname);
  63. userlogc_debug( "TxLog xyz_access op=%c data=%s db=%s", req->op, req->data, req->db);
  64. rv = ((remote & REMOTE_ACCESS) ? send_req(req, &rbuf) : p->access(req, &res));
  65. userlogc_debug( "TxLog invoked ok");
  66. if (rv)
  67. userlogc_warn( "TxLog BAD REQ %d", rv);
  68. free_buf((remote & REMOTE_ACCESS), req);
  69. rv = (rv == 0 ? atoi(res.data) : -1);
  70. userlogc_debug( "TxLog and count is %d", rv);
  71. if (rv == -1) {
  72. userlogc_warn( "TxLog Error: Db %d access error", p->id);
  73. }
  74. userlogc_debug( "TxLog product %s (id=%d) dbf=%s returned %d records", p->pname, p->id, p->dbname, rv);
  75. if (rv != cnt && cnt != -1) {
  76. userlogc_warn( "TxLog All databases should have the same no of records: db %d cnt %d (prev was %d)", p->id, rv, cnt);
  77. rv = -1;
  78. }
  79. if ((cnt = rv) == -1)
  80. break;
  81. }
  82. if (in_tx || end_tx(TX_TYPE_COMMIT) == TX_OK)
  83. return cnt;
  84. }
  85. return cnt;
  86. }
  87. static int check_count(const char *msg, char *key, int in_tx, int expect) {
  88. int rcnt;
  89. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  90. if ((rcnt = count_records(msg, key, 0, expect)) != expect) {
  91. userlogc_warn( "TxLog WRONG NUMBER OF RECORDS: %d expected %d", rcnt, expect);
  92. return -1;
  93. }
  94. userlogc( "TxLog %s: RECORD COUNT: %d expected %d", testid, rcnt, expect);
  95. return 0;
  96. }
  97. static int db_op(const char *msg, const char *data, char op, int txtype,
  98.  char **prbuf, int remote, int migrating, int expect) {
  99. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  100. if (msg)
  101. userlogc( "TxLog %s: %s %s", testid, ((remote | REMOTE_ACCESS) ? "REMOTE" : "LOCAL"), msg);
  102. if (start_tx(txtype) == TX_OK) {
  103. int rv = 0;
  104. test_req_t *req;
  105. test_req_t res;
  106. product_t *p = prods;
  107. for (p = prods; p->id != -1; p++) {
  108. #if 0
  109. if (migrating && (p->xaflags() & TMNOMIGRATE)) {
  110. /* the RM does not support tx migration (see XA spec for explanation */
  111. userlogc_warn( "TxLog Info: RM %d does not support tx migration (switching from remote)", p->id);
  112. remote = !remote;
  113. }
  114. #endif
  115. if ((p->loc & remote) == 0) /* the RM does not support the requested access type */
  116. remote = p->loc;
  117. req = get_buf((remote & REMOTE_ACCESS), data, p->dbname, op, p->id, txtype, expect);
  118. userlogc_debug( "TxLog invoke prod %s (id=%d) remote=%d dbf=%s", p->pname, p->id, remote, p->dbname);
  119. userlogc_debug( "TxLog xyz_access op=%c data=%s db=%s", req->op, req->data, req->db);
  120. rv = ((remote & REMOTE_ACCESS) ? send_req(req, prbuf) : p->access(req, &res));
  121. if (rv)
  122. userlogc_warn( "TxLog BAD REQ %d", rv);
  123. free_buf((remote & REMOTE_ACCESS), req);
  124. }
  125. if (end_tx(txtype) == TX_OK)
  126. return rv;
  127. }
  128. return -1;
  129. }
  130. /**
  131.  * ensure that all the target databases start off in the same state
  132.  */
  133. static int setup() {
  134. int rv;
  135. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  136. /* start off with no records */
  137. if ((rv = db_op("DELETE AT SETUP", emps[0], '3', TX_TYPE_BEGIN_COMMIT, 0, LOCAL_ACCESS, 0, -1)) != 0)
  138. return rv;
  139. if ((rv = check_count("COUNT RECORDS", emps[0], 0, 0)))
  140. return rv;
  141. return 0;
  142. }
  143. /**
  144.  * remove all the records added by the test
  145.  */
  146. static int teardown(int *cnt)
  147. {
  148. int rv;
  149. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  150. /* delete records starting from emps[0], */
  151. if ((rv = db_op("DELETE", emps[0], '3', TX_TYPE_BEGIN_COMMIT, 0, LOCAL_ACCESS, 0, -1)))
  152. return rv;
  153. *cnt = 0;
  154. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  155. return rv;
  156. return 0;
  157. }
  158. #ifdef TEST0
  159. static int test0(int *cnt)
  160. {
  161. int rv;
  162. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  163. set_test_id("Test 0");
  164. #if 1
  165. /* ask the remote service to insert a record */
  166. if ((rv = db_op("INSERT 1", emps[5], '0', TX_TYPE_BEGIN, 0, REMOTE_ACCESS, 0, -1)))
  167. return rv;
  168. /* ask the remote service to insert another record in the same transaction */
  169. if ((rv = db_op("INSERT 2", emps[6], '0', TX_TYPE_NONE, 0, REMOTE_ACCESS, 0, -1)))
  170. return rv;
  171. /* insert a record and end the already running transaction */
  172. if ((rv = db_op("INSERT", emps[7], '0', TX_TYPE_COMMIT, 0, LOCAL_ACCESS, 1, -1)))
  173. return rv;
  174. *cnt += 3;
  175. #else
  176. if ((rv = db_op("INSERT 1", emps[5], '0', TX_TYPE_BEGIN_COMMIT, 0, LOCAL_ACCESS, 0, -1)))
  177. return rv;
  178. *cnt += 1;
  179. #endif
  180. /* make sure the record count increases by 3 */
  181. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  182. return -1;
  183. return 0;
  184. }
  185. #endif
  186. /**
  187.  * start a transaction
  188.  * perform a remote insert on each target db
  189.  * perform another remote insert on each target db
  190.  * perform a local insert on each target db
  191.  * commit the transaction
  192.  *
  193.  * Note for databases that do not support tx migration (meaning that performing a
  194.  * remote insert followed by a local insert is not supported) the local operation
  195.  * is switched to a remote one (which ensures that all the updates are done in the
  196.  * same thread).
  197.  */
  198. static int test1(int *cnt)
  199. {
  200. int rv;
  201. userlogc_debug( "TxLog ENTER %s:%d", __FUNCTION__, __LINE__);
  202. set_test_id("Test 1");
  203. /* ask the remote service to insert a record */
  204. if ((rv = db_op("INSERT 1", emps[5], '0', TX_TYPE_BEGIN, 0, REMOTE_ACCESS, 0, -1))) {
  205. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  206. return rv;
  207. }
  208. /* ask the remote service to insert another record in the same transaction */
  209. if ((rv = db_op("INSERT 2", emps[6], '0', TX_TYPE_NONE, 0, REMOTE_ACCESS, 0, -1))) {
  210. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  211. return rv;
  212. }
  213. /* insert a record and end the already running transaction */
  214. if ((rv = db_op("INSERT", emps[7], '0', TX_TYPE_COMMIT, 0, LOCAL_ACCESS, 1, -1))) {
  215. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  216. return rv;
  217. }
  218. *cnt += 3;
  219. /* make sure the record count increases by 3 */
  220. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt))) {
  221. userlogc_warn( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  222. return -1;
  223. }
  224. userlogc_debug( "TxLog LEAVE %s:%d res=%d", __FUNCTION__, __LINE__, rv);
  225. return 0;
  226. }
  227. /**
  228.  * start a transaction
  229.  * perform a remote insert on each target db
  230.  * perform another remote insert on each target db
  231.  * perform a third remote insert on each target db
  232.  * commit the transaction
  233.  */
  234. static int test2(int *cnt)
  235. {
  236. int rv;
  237. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  238. set_test_id("Test 2");
  239. /* ask the remote service to insert a record */
  240. if ((rv = db_op("INSERT 1", emps[0], '0', TX_TYPE_BEGIN, 0, REMOTE_ACCESS, 0, -1)))
  241. return rv;
  242. /* ask the remote service to insert another record in the same transaction */
  243. if ((rv = db_op("INSERT 2", emps[1], '0', TX_TYPE_NONE, 0, REMOTE_ACCESS, 0, -1)))
  244. return rv;
  245. /* insert a record and end the already running transaction */
  246. if ((rv = db_op("INSERT 3", emps[2], '0', TX_TYPE_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  247. return rv;
  248. *cnt += 3;
  249. /* make sure the record count increases by 3 */
  250. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  251. return -1;
  252. return 0;
  253. }
  254. /**
  255.  * start a transaction
  256.  * insert a record into each db
  257.  * commit the transaction
  258.  *
  259.  * repeat the test but rollback the transaction instead of commit
  260.  */
  261. static int test3(int *cnt)
  262. {
  263. int rv;
  264. userlogc_debug( "TxLog IN %s:%d", __FUNCTION__, __LINE__);
  265. set_test_id("Test 3");
  266. /* ask the remote service to insert a record */
  267. if ((rv = db_op("INSERT 1", emps[3], '0', TX_TYPE_BEGIN_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  268. return rv;
  269. *cnt += 1;
  270. /* delete records starting from emps[0] but abort it */
  271. if ((rv = db_op("INSERT WITH ABORT", emps[4], '1', TX_TYPE_BEGIN_ABORT, 0, REMOTE_ACCESS, 0, -1)))
  272. return rv;
  273. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  274. return rv;
  275. userlogc_debug( "TxLog OUT %s:%d success", __FUNCTION__, __LINE__);
  276. return 0;
  277. }
  278. /**
  279.  * start tx, do a remote insert on all dbs commit the tx
  280.  * start tx, do a remote update on all dbs commit the tx
  281.  * start tx, do a remote delete on all dbs abort the tx
  282.  * start tx, do a local delete on all dbs abort the tx
  283.  */
  284. static int test4(int *cnt)
  285. {
  286. int rv;
  287. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  288. set_test_id("Test 4");
  289. /* ask the remote service to insert a record */
  290. if ((rv = db_op("INSERT 1", emps[4], '0', TX_TYPE_BEGIN_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  291. return rv;
  292. *cnt += 1;
  293. /* modify one of the records */
  294. if ((rv = db_op("UPDATE", emps[4], '2', TX_TYPE_BEGIN_COMMIT, 0, REMOTE_ACCESS, 0, -1)))
  295. return rv;
  296. /* remote delete records starting from emps[0] with abort*/
  297. if ((rv = db_op("DELETE WITH ABORT", emps[0], '3', TX_TYPE_BEGIN_ABORT, 0, REMOTE_ACCESS, 0, -1)))
  298. return rv;
  299. /* local delete records starting from emps[0] with abort*/
  300. if ((rv = db_op("DELETE WITH ABORT", emps[0], '3', TX_TYPE_BEGIN_ABORT, 0, LOCAL_ACCESS, 0, -1)))
  301. return rv;
  302. if ((rv = check_count("COUNT RECORDS", emps[0], 0, *cnt)))
  303. return rv;
  304. return 0;
  305. }
  306. /* cause the program to halt during phase 2 of the transaction 2PC protocol */
  307. static int testrc(int *cnt)
  308. {
  309. int rv;
  310. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  311. set_test_id("Test 5");
  312. /* ask the remote service to insert a record but to halt during commit */
  313. if ((rv = tx_set_commit_return(TX_COMMIT_COMPLETED))) /* report heuristics */
  314. userlogc_warn( "tx_set_commit_return error: %d" , rv);
  315. if ((rv = db_op("INSERT 1", emps[4], '0', TX_TYPE_BEGIN_COMMIT_HALT, 0, REMOTE_ACCESS, 0, -1))) {
  316. userlogc_warn( "tpcall error %d" , rv);
  317. return rv;
  318. }
  319. /*
  320.  * Unfortunately there is no way of knowing the outcome of commiting the transaction.
  321.  * TODO check the circumstances under which heuristics are generated
  322.  * I also need to figure out why killing the server a number of times corrupts the
  323.  * Berkeley DB (which makes recovery kind of useless)
  324.  */
  325. userlogc("tx status: %d (should be -1 meaning no transaction)", get_tx_status());
  326. (void) tx_set_commit_return(TX_COMMIT_DECISION_LOGGED); /* return when the commit decision is logged */
  327. return 0;
  328. }
  329. int run_tests(product_t *prod_array)
  330. {
  331. int rv, i, cnt = 0;
  332. struct test {
  333. const char *name;
  334. int (*test)(int *);
  335. } tests[] = {
  336. #if defined(TX_RC)   // test recovery
  337. {"testrc", testrc},
  338. #elif defined(TX_MCALLS)   // tx extends over multiple tpacalls
  339. {"test1", test1},
  340. {"test2", test2},
  341. #elif defined(TX_SCALL)  // tx is active for a single tpacall
  342. {"test3", test3},
  343. {"test4", test4},
  344. #else
  345. {"test1", test1},
  346. {"test2", test2},
  347. {"test3", test3},
  348. {"test4", test4},
  349. #endif
  350. {0, 0}
  351. };
  352. userlogc_debug( "TxLog %s:%d", __FUNCTION__, __LINE__);
  353. prods = prod_array;
  354. if (tx_open() != TX_OK)
  355. return fatal("TxLog ERROR - Could not open RMs");
  356. if ((rv = setup()))
  357. return rv;
  358. for (i = 0; tests[i].test != 0; i++) {
  359. if ((rv = tests[i].test(&cnt))) {
  360. userlogc_warn( (char*) "TxLog %s FAILED", tests[i].name);
  361. return rv;
  362. }
  363. userlogc( (char*) "TxLog %s PASSED", tests[i].name);
  364. }
  365. userlogc( (char*) "TxLog Tests complete");
  366. if ((rv = teardown(&cnt)))
  367. return rv;
  368. if (tx_close() != TX_OK) {
  369. userlogc_warn( (char*) "TxLog ERROR - Could not close transaction: ");
  370. return fatal("ERROR - Could not close RMs");
  371. }
  372. return 0;
  373. }