rep_record.c
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:42k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /*-
  2.  * See the file LICENSE for redistribution information.
  3.  *
  4.  * Copyright (c) 2001-2002
  5.  * Sleepycat Software.  All rights reserved.
  6.  */
  7. #include "db_config.h"
  8. #ifndef lint
  9. static const char revid[] = "$Id: rep_record.c,v 1.111 2002/09/11 19:39:11 bostic Exp $";
  10. #endif /* not lint */
  11. #ifndef NO_SYSTEM_INCLUDES
  12. #include <stdlib.h>
  13. #include <string.h>
  14. #endif
  15. #include "db_int.h"
  16. #include "dbinc/db_page.h"
  17. #include "dbinc/db_am.h"
  18. #include "dbinc/log.h"
  19. #include "dbinc/rep.h"
  20. #include "dbinc/txn.h"
  21. static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *));
  22. static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));
  23. static int __rep_lsn_cmp __P((const void *, const void *));
  24. static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *));
  25. #define IS_SIMPLE(R) ((R) != DB___txn_regop && 
  26.     (R) != DB___txn_ckp && (R) != DB___dbreg_register)
  27. /*
  28.  * __rep_process_message --
  29.  *
  30.  * This routine takes an incoming message and processes it.
  31.  *
  32.  * control: contains the control fields from the record
  33.  * rec: contains the actual record
  34.  * eidp: contains the machine id of the sender of the message;
  35.  * in the case of a DB_NEWMASTER message, returns the eid
  36.  * of the new master.
  37.  *
  38.  * PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *));
  39.  */
  40. int
  41. __rep_process_message(dbenv, control, rec, eidp)
  42. DB_ENV *dbenv;
  43. DBT *control, *rec;
  44. int *eidp;
  45. {
  46. DB_LOG *dblp;
  47. DB_LOGC *logc;
  48. DB_LSN init_lsn, lsn, newfilelsn, oldfilelsn;
  49. DB_REP *db_rep;
  50. DBT *d, data_dbt, lsndbt, mylog;
  51. LOG *lp;
  52. REP *rep;
  53. REP_CONTROL *rp;
  54. REP_VOTE_INFO *vi;
  55. u_int32_t bytes, gen, gbytes, type, unused;
  56. int check_limit, cmp, done, do_req, i;
  57. int master, old, recovering, ret, t_ret, *tally;
  58. PANIC_CHECK(dbenv);
  59. ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
  60. /* Control argument must be non-Null. */
  61. if (control == NULL || control->size == 0) {
  62. __db_err(dbenv,
  63. "DB_ENV->rep_process_message: control argument must be specified");
  64. return (EINVAL);
  65. }
  66. ret = 0;
  67. db_rep = dbenv->rep_handle;
  68. rep = db_rep->region;
  69. dblp = dbenv->lg_handle;
  70. lp = dblp->reginfo.primary;
  71. MUTEX_LOCK(dbenv, db_rep->mutexp);
  72. gen = rep->gen;
  73. recovering = F_ISSET(rep, REP_F_RECOVER);
  74. rep->stat.st_msgs_processed++;
  75. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  76. rp = (REP_CONTROL *)control->data;
  77. #if 0
  78. __rep_print_message(dbenv, *eidp, rp, "rep_process_message");
  79. #endif
  80. /* Complain if we see an improper version number. */
  81. if (rp->rep_version != DB_REPVERSION) {
  82. __db_err(dbenv,
  83.     "unexpected replication message version %d, expected %d",
  84.     rp->rep_version, DB_REPVERSION);
  85. return (EINVAL);
  86. }
  87. if (rp->log_version != DB_LOGVERSION) {
  88. __db_err(dbenv,
  89.     "unexpected log record version %d, expected %d",
  90.     rp->log_version, DB_LOGVERSION);
  91. return (EINVAL);
  92. }
  93. /*
  94.  * Check for generation number matching.  Ignore any old messages
  95.  * except requests that are indicative of a new client that needs
  96.  * to get in sync.
  97.  */
  98. if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
  99.     rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ) {
  100. /*
  101.  * We don't hold the rep mutex, and could miscount if we race.
  102.  */
  103. rep->stat.st_msgs_badgen++;
  104. return (0);
  105. }
  106. if (rp->gen > gen && rp->rectype != REP_ALIVE &&
  107.     rp->rectype != REP_NEWMASTER)
  108. return (__rep_send_message(dbenv,
  109.     DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0));
  110. /*
  111.  * We need to check if we're in recovery and if we are
  112.  * then we need to ignore any messages except VERIFY, VOTE,
  113.  * ELECT (the master might fail while we are recovering), and
  114.  * ALIVE_REQ.
  115.  */
  116. if (recovering)
  117. switch(rp->rectype) {
  118. case REP_ALIVE:
  119. case REP_ALIVE_REQ:
  120. case REP_ELECT:
  121. case REP_NEWCLIENT:
  122. case REP_NEWMASTER:
  123. case REP_NEWSITE:
  124. case REP_VERIFY:
  125. R_LOCK(dbenv, &dblp->reginfo);
  126. cmp = log_compare(&lp->verify_lsn, &rp->lsn);
  127. R_UNLOCK(dbenv, &dblp->reginfo);
  128. if (cmp != 0)
  129. goto skip;
  130. /* FALLTHROUGH */
  131. case REP_VOTE1:
  132. case REP_VOTE2:
  133. break;
  134. default:
  135. skip: /*
  136.  * We don't hold the rep mutex, and could
  137.  * miscount if we race.
  138.  */
  139. rep->stat.st_msgs_recover++;
  140. /* Check for need to retransmit. */
  141. R_LOCK(dbenv, &dblp->reginfo);
  142. do_req = *eidp == rep->master_id &&
  143.     ++lp->rcvd_recs >= lp->wait_recs;
  144. if (do_req) {
  145. lp->wait_recs *= 2;
  146. if (lp->wait_recs + rep->max_gap)
  147. lp->wait_recs = rep->max_gap;
  148. lp->rcvd_recs = 0;
  149. lsn = lp->verify_lsn;
  150. }
  151. R_UNLOCK(dbenv, &dblp->reginfo);
  152. if (do_req)
  153. ret = __rep_send_message(dbenv, *eidp,
  154.     REP_VERIFY_REQ, &lsn, NULL, 0);
  155. return (ret);
  156. }
  157. switch(rp->rectype) {
  158. case REP_ALIVE:
  159. ANYSITE(dbenv);
  160. if (rp->gen > gen && rp->flags)
  161. return (__rep_new_master(dbenv, rp, *eidp));
  162. break;
  163. case REP_ALIVE_REQ:
  164. ANYSITE(dbenv);
  165. dblp = dbenv->lg_handle;
  166. R_LOCK(dbenv, &dblp->reginfo);
  167. lsn = ((LOG *)dblp->reginfo.primary)->lsn;
  168. R_UNLOCK(dbenv, &dblp->reginfo);
  169. return (__rep_send_message(dbenv,
  170.     *eidp, REP_ALIVE, &lsn, NULL,
  171.     F_ISSET(dbenv, DB_ENV_REP_MASTER) ? 1 : 0));
  172. case REP_ALL_REQ:
  173. MASTER_ONLY(dbenv);
  174. gbytes  = bytes = 0;
  175. MUTEX_LOCK(dbenv, db_rep->mutexp);
  176. gbytes = rep->gbytes;
  177. bytes = rep->bytes;
  178. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  179. check_limit = gbytes != 0 || bytes != 0;
  180. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  181. return (ret);
  182. memset(&data_dbt, 0, sizeof(data_dbt));
  183. oldfilelsn = lsn = rp->lsn;
  184. type = REP_LOG;
  185. for (ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET);
  186.     ret == 0 && type == REP_LOG;
  187.     ret = logc->get(logc, &lsn, &data_dbt, DB_NEXT)) {
  188. /*
  189.  * lsn.offset will only be 0 if this is the
  190.  * beginning of the log;  DB_SET, but not DB_NEXT,
  191.  * can set the log cursor to [n][0].
  192.  */
  193. if (lsn.offset == 0)
  194. ret = __rep_send_message(dbenv, *eidp,
  195.     REP_NEWFILE, &lsn, NULL, 0);
  196. else {
  197. /*
  198.  * DB_NEXT will never run into offsets
  199.  * of 0;  thus, when a log file changes,
  200.  * we'll have a real log record with
  201.  * some lsn [n][m], and we'll also want to send
  202.  * a NEWFILE message with lsn [n][0].
  203.  * So that the client can detect gaps,
  204.  * send in the rec parameter the
  205.  * last LSN in the old file.
  206.  */
  207. if (lsn.file != oldfilelsn.file) {
  208. newfilelsn.file = lsn.file;
  209. newfilelsn.offset = 0;
  210. memset(&lsndbt, 0, sizeof(DBT));
  211. lsndbt.size = sizeof(DB_LSN);
  212. lsndbt.data = &oldfilelsn;
  213. if ((ret = __rep_send_message(dbenv,
  214.     *eidp, REP_NEWFILE, &newfilelsn,
  215.     &lsndbt, 0)) != 0)
  216. break;
  217. }
  218. if (check_limit) {
  219. /*
  220.  * data_dbt.size is only the size of
  221.  * the log record;  it doesn't count
  222.  * the size of the control structure.
  223.  * Factor that in as well so we're
  224.  * not off by a lot if our log
  225.  * records are small.
  226.  */
  227. while (bytes < data_dbt.size +
  228.     sizeof(REP_CONTROL)) {
  229. if (gbytes > 0) {
  230. bytes += GIGABYTE;
  231. --gbytes;
  232. continue;
  233. }
  234. /*
  235.  * We don't hold the rep mutex,
  236.  * and may miscount.
  237.  */
  238. rep->stat.st_nthrottles++;
  239. type = REP_LOG_MORE;
  240. goto send;
  241. }
  242. bytes -= (data_dbt.size +
  243.     sizeof(REP_CONTROL));
  244. }
  245. send: ret = __rep_send_message(dbenv, *eidp,
  246.     type, &lsn, &data_dbt, 0);
  247. }
  248. /*
  249.  * In case we're about to change files and need it
  250.  * for a NEWFILE message, save the current LSN.
  251.  */
  252. oldfilelsn = lsn;
  253. }
  254. if (ret == DB_NOTFOUND)
  255. ret = 0;
  256. if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  257. ret = t_ret;
  258. return (ret);
  259. case REP_ELECT:
  260. if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
  261. R_LOCK(dbenv, &dblp->reginfo);
  262. lsn = lp->lsn;
  263. R_UNLOCK(dbenv, &dblp->reginfo);
  264. MUTEX_LOCK(dbenv, db_rep->mutexp);
  265. rep->gen++;
  266. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  267. return (__rep_send_message(dbenv,
  268.     *eidp, REP_NEWMASTER, &lsn, NULL, 0));
  269. }
  270. MUTEX_LOCK(dbenv, db_rep->mutexp);
  271. ret = IN_ELECTION(rep) ? 0 : DB_REP_HOLDELECTION;
  272. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  273. return (ret);
  274. #ifdef NOTYET
  275. case REP_FILE: /* TODO */
  276. CLIENT_ONLY(dbenv);
  277. break;
  278. case REP_FILE_REQ:
  279. MASTER_ONLY(dbenv);
  280. return (__rep_send_file(dbenv, rec, *eidp));
  281. break;
  282. #endif
  283. case REP_LOG:
  284. case REP_LOG_MORE:
  285. CLIENT_ONLY(dbenv);
  286. if ((ret = __rep_apply(dbenv, rp, rec)) != 0)
  287. return (ret);
  288. if (rp->rectype == REP_LOG_MORE) {
  289. MUTEX_LOCK(dbenv, db_rep->db_mutexp);
  290. master = rep->master_id;
  291. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  292. R_LOCK(dbenv, &dblp->reginfo);
  293. lsn = lp->lsn;
  294. R_UNLOCK(dbenv, &dblp->reginfo);
  295. ret = __rep_send_message(dbenv, master,
  296.     REP_ALL_REQ, &lsn, NULL, 0);
  297. }
  298. return (ret);
  299. case REP_LOG_REQ:
  300. MASTER_ONLY(dbenv);
  301. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  302. return (ret);
  303. memset(&data_dbt, 0, sizeof(data_dbt));
  304. lsn = rp->lsn;
  305. /*
  306.  * There are three different cases here.
  307.  * 1. We asked for a particular LSN and got it.
  308.  * 2. We asked for an LSN of X,0 which is invalid and got the
  309.  *  first log record in a particular file.
  310.  * 3. We asked for an LSN and it's not found because it is
  311.  * beyond the end of a log file and we need a NEWFILE msg.
  312.  */
  313. ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET);
  314. cmp = log_compare(&lsn, &rp->lsn);
  315. if (ret == 0 && cmp == 0) /* Case 1 */
  316. ret = __rep_send_message(dbenv, *eidp,
  317.     REP_LOG, &rp->lsn, &data_dbt, 0);
  318. else if (ret == DB_NOTFOUND ||
  319.     (ret == 0 && cmp < 0 && rp->lsn.offset == 0))
  320. /* Cases 2 and 3: Send a NEWFILE message. */
  321. ret = __rep_send_message(dbenv, *eidp,
  322.     REP_NEWFILE, &lsn, NULL, 0);
  323. if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  324. ret = t_ret;
  325. return (ret);
  326. case REP_NEWSITE:
  327. /* We don't hold the rep mutex, and may miscount. */
  328. rep->stat.st_newsites++;
  329. /* This is a rebroadcast; simply tell the application. */
  330. if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
  331. dblp = dbenv->lg_handle;
  332. lp = dblp->reginfo.primary;
  333. R_LOCK(dbenv, &dblp->reginfo);
  334. lsn = lp->lsn;
  335. R_UNLOCK(dbenv, &dblp->reginfo);
  336. (void)__rep_send_message(dbenv,
  337.     *eidp, REP_NEWMASTER, &lsn, NULL, 0);
  338. }
  339. return (DB_REP_NEWSITE);
  340. case REP_NEWCLIENT:
  341. /*
  342.  * This message was received and should have resulted in the
  343.  * application entering the machine ID in its machine table.
  344.  * We respond to this with an ALIVE to send relevant information
  345.  * to the new client.  But first, broadcast the new client's
  346.  * record to all the clients.
  347.  */
  348. if ((ret = __rep_send_message(dbenv,
  349.     DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0)) != 0)
  350. return (ret);
  351. if (F_ISSET(dbenv, DB_ENV_REP_CLIENT))
  352. return (0);
  353.  /* FALLTHROUGH */
  354. case REP_MASTER_REQ:
  355. ANYSITE(dbenv);
  356. if (F_ISSET(dbenv, DB_ENV_REP_CLIENT))
  357. return (0);
  358. dblp = dbenv->lg_handle;
  359. lp = dblp->reginfo.primary;
  360. R_LOCK(dbenv, &dblp->reginfo);
  361. lsn = lp->lsn;
  362. R_UNLOCK(dbenv, &dblp->reginfo);
  363. return (__rep_send_message(dbenv,
  364.     *eidp, REP_NEWMASTER, &lsn, NULL, 0));
  365. case REP_NEWFILE:
  366. CLIENT_ONLY(dbenv);
  367. return (__rep_apply(dbenv, rp, rec));
  368. case REP_NEWMASTER:
  369. ANYSITE(dbenv);
  370. if (F_ISSET(dbenv, DB_ENV_REP_MASTER) &&
  371.     *eidp != dbenv->rep_eid) {
  372. /* We don't hold the rep mutex, and may miscount. */
  373. rep->stat.st_dupmasters++;
  374. return (DB_REP_DUPMASTER);
  375. }
  376. return (__rep_new_master(dbenv, rp, *eidp));
  377. case REP_PAGE: /* TODO */
  378. CLIENT_ONLY(dbenv);
  379. break;
  380. case REP_PAGE_REQ: /* TODO */
  381. MASTER_ONLY(dbenv);
  382. break;
  383. case REP_PLIST: /* TODO */
  384. CLIENT_ONLY(dbenv);
  385. break;
  386. case REP_PLIST_REQ: /* TODO */
  387. MASTER_ONLY(dbenv);
  388. break;
  389. case REP_VERIFY:
  390. CLIENT_ONLY(dbenv);
  391. DB_ASSERT((F_ISSET(rep, REP_F_RECOVER) &&
  392.     !IS_ZERO_LSN(lp->verify_lsn)) ||
  393.     (!F_ISSET(rep, REP_F_RECOVER) &&
  394.     IS_ZERO_LSN(lp->verify_lsn)));
  395. if (IS_ZERO_LSN(lp->verify_lsn))
  396. return (0);
  397. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  398. return (ret);
  399. memset(&mylog, 0, sizeof(mylog));
  400. if ((ret = logc->get(logc, &rp->lsn, &mylog, DB_SET)) != 0)
  401. goto rep_verify_err;
  402. if (mylog.size == rec->size &&
  403.     memcmp(mylog.data, rec->data, rec->size) == 0) {
  404. /*
  405.  * If we're a logs-only client, we can simply truncate
  406.  * the log to the point where it last agreed with the
  407.  * master's;  otherwise, recover to that point.
  408.  */
  409. R_LOCK(dbenv, &dblp->reginfo);
  410. ZERO_LSN(lp->verify_lsn);
  411. R_UNLOCK(dbenv, &dblp->reginfo);
  412. if (F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) {
  413. INIT_LSN(init_lsn);
  414. if ((ret = dbenv->log_flush(dbenv,
  415.     &rp->lsn)) != 0 ||
  416.     (ret = __log_vtruncate(dbenv,
  417.     &rp->lsn, &init_lsn)) != 0)
  418. goto rep_verify_err;
  419. } else if ((ret = __db_apprec(dbenv, &rp->lsn, 0)) != 0)
  420. goto rep_verify_err;
  421. /*
  422.  * The log has been truncated (either by __db_apprec or
  423.  * directly).  We want to make sure we're waiting for
  424.  * the LSN at the new end-of-log, not some later point.
  425.  */
  426. R_LOCK(dbenv, &dblp->reginfo);
  427. lp->ready_lsn = lp->lsn;
  428. ZERO_LSN(lp->waiting_lsn);
  429. R_UNLOCK(dbenv, &dblp->reginfo);
  430. /*
  431.  * Discard any log records we have queued;  we're
  432.  * about to re-request them, and can't trust the
  433.  * ones in the queue.
  434.  */
  435. MUTEX_LOCK(dbenv, db_rep->db_mutexp);
  436. if ((ret = db_rep->rep_db->truncate(db_rep->rep_db,
  437.     NULL, &unused, 0)) != 0) {
  438. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  439. goto rep_verify_err;
  440. }
  441. rep->stat.st_log_queued = 0;
  442. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  443. MUTEX_LOCK(dbenv, db_rep->mutexp);
  444. F_CLR(rep, REP_F_RECOVER);
  445. /*
  446.  * If the master_id is invalid, this means that since
  447.  * the last record was sent, somebody declared an
  448.  * election and we may not have a master to request
  449.  * things of.
  450.  *
  451.  * This is not an error;  when we find a new master,
  452.  * we'll re-negotiate where the end of the log is and
  453.  * try to bring ourselves up to date again anyway.
  454.  */
  455. if ((master = rep->master_id) == DB_EID_INVALID) {
  456. DB_ASSERT(IN_ELECTION(rep));
  457. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  458. ret = 0;
  459. } else {
  460. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  461. ret = __rep_send_message(dbenv, master,
  462.     REP_ALL_REQ, &rp->lsn, NULL, 0);
  463. }
  464. } else if ((ret =
  465.     logc->get(logc, &lsn, &mylog, DB_PREV)) == 0) {
  466. R_LOCK(dbenv, &dblp->reginfo);
  467. lp->verify_lsn = lsn;
  468. lp->rcvd_recs = 0;
  469. lp->wait_recs = rep->request_gap;
  470. R_UNLOCK(dbenv, &dblp->reginfo);
  471. ret = __rep_send_message(dbenv,
  472.     *eidp, REP_VERIFY_REQ, &lsn, NULL, 0);
  473. }
  474. rep_verify_err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  475. ret = t_ret;
  476. return (ret);
  477. case REP_VERIFY_FAIL:
  478. rep->stat.st_outdated++;
  479. return (DB_REP_OUTDATED);
  480. case REP_VERIFY_REQ:
  481. MASTER_ONLY(dbenv);
  482. type = REP_VERIFY;
  483. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  484. return (ret);
  485. d = &data_dbt;
  486. memset(d, 0, sizeof(data_dbt));
  487. F_SET(logc, DB_LOG_SILENT_ERR);
  488. ret = logc->get(logc, &rp->lsn, d, DB_SET);
  489. /*
  490.  * If the LSN was invalid, then we might get a not
  491.  * found, we might get an EIO, we could get anything.
  492.  * If we get a DB_NOTFOUND, then there is a chance that
  493.  * the LSN comes before the first file present in which
  494.  * case we need to return a fail so that the client can return
  495.  * a DB_OUTDATED.
  496.  */
  497. if (ret == DB_NOTFOUND &&
  498.     __log_is_outdated(dbenv, rp->lsn.file, &old) == 0 &&
  499.     old != 0)
  500. type = REP_VERIFY_FAIL;
  501. if (ret != 0)
  502. d = NULL;
  503. ret = __rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0);
  504. if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  505. ret = t_ret;
  506. return (ret);
  507. case REP_VOTE1:
  508. if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
  509. #ifdef DIAGNOSTIC
  510. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  511. __db_err(dbenv, "Master received vote");
  512. #endif
  513. R_LOCK(dbenv, &dblp->reginfo);
  514. lsn = lp->lsn;
  515. R_UNLOCK(dbenv, &dblp->reginfo);
  516. return (__rep_send_message(dbenv,
  517.     *eidp, REP_NEWMASTER, &lsn, NULL, 0));
  518. }
  519. vi = (REP_VOTE_INFO *)rec->data;
  520. MUTEX_LOCK(dbenv, db_rep->mutexp);
  521. /*
  522.  * If you get a vote and you're not in an election, simply
  523.  * return an indicator to hold an election which will trigger
  524.  * this site to send its vote again.
  525.  */
  526. if (!IN_ELECTION(rep)) {
  527. #ifdef DIAGNOSTIC
  528. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  529. __db_err(dbenv,
  530.     "Not in election, but received vote1");
  531. #endif
  532. ret = DB_REP_HOLDELECTION;
  533. goto unlock;
  534. }
  535. if (F_ISSET(rep, REP_F_EPHASE2))
  536. goto unlock;
  537. /* Check if this site knows about more sites than we do. */
  538. if (vi->nsites > rep->nsites)
  539. rep->nsites = vi->nsites;
  540. /* Check if we've heard from this site already. */
  541. tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
  542. for (i = 0; i < rep->sites; i++) {
  543. if (tally[i] == *eidp)
  544. /* Duplicate vote. */
  545. goto unlock;
  546. }
  547. /*
  548.  * We are keeping vote, let's see if that changes our count of
  549.  * the number of sites.
  550.  */
  551. if (rep->sites + 1 > rep->nsites)
  552. rep->nsites = rep->sites + 1;
  553. if (rep->nsites > rep->asites &&
  554.     (ret = __rep_grow_sites(dbenv, rep->nsites)) != 0)
  555. goto unlock;
  556. tally[rep->sites] = *eidp;
  557. rep->sites++;
  558. /*
  559.  * Change winners if the incoming record has a higher
  560.  * priority, or an equal priority but a larger LSN, or
  561.  * an equal priority and LSN but higher "tiebreaker" value.
  562.  */
  563. #ifdef DIAGNOSTIC
  564. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
  565. __db_err(dbenv,
  566.     "%s(eid)%d (pri)%d (gen)%d (sites)%d [%d,%d]",
  567.     "Existing vote: ",
  568.     rep->winner, rep->w_priority, rep->w_gen,
  569.     rep->sites, rep->w_lsn.file, rep->w_lsn.offset);
  570. __db_err(dbenv,
  571.     "Incoming vote: (eid)%d (pri)%d (gen)%d [%d,%d]",
  572.     *eidp, vi->priority, rp->gen, rp->lsn.file,
  573.     rp->lsn.offset);
  574. }
  575. #endif
  576. cmp = log_compare(&rp->lsn, &rep->w_lsn);
  577. if (vi->priority > rep->w_priority ||
  578.     (vi->priority != 0 && vi->priority == rep->w_priority &&
  579.     (cmp > 0 ||
  580.     (cmp == 0 && vi->tiebreaker > rep->w_tiebreaker)))) {
  581. #ifdef DIAGNOSTIC
  582. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  583. __db_err(dbenv, "Accepting new vote");
  584. #endif
  585. rep->winner = *eidp;
  586. rep->w_priority = vi->priority;
  587. rep->w_lsn = rp->lsn;
  588. rep->w_gen = rp->gen;
  589. }
  590. master = rep->winner;
  591. lsn = rep->w_lsn;
  592. done = rep->sites == rep->nsites && rep->w_priority != 0;
  593. if (done) {
  594. #ifdef DIAGNOSTIC
  595. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
  596. __db_err(dbenv, "Phase1 election done");
  597. __db_err(dbenv, "Voting for %d%s",
  598.     master, master == rep->eid ? "(self)" : "");
  599. }
  600. #endif
  601. F_CLR(rep, REP_F_EPHASE1);
  602. F_SET(rep, REP_F_EPHASE2);
  603. }
  604. if (done && master == rep->eid) {
  605. rep->votes++;
  606. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  607. return (0);
  608. }
  609. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  610. /* Vote for someone else. */
  611. if (done)
  612. return (__rep_send_message(dbenv,
  613.     master, REP_VOTE2, NULL, NULL, 0));
  614. /* Election is still going on. */
  615. break;
  616. case REP_VOTE2:
  617. #ifdef DIAGNOSTIC
  618. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  619. __db_err(dbenv, "We received a vote%s",
  620.     F_ISSET(dbenv, DB_ENV_REP_MASTER) ?
  621.     " (master)" : "");
  622. #endif
  623. if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
  624. R_LOCK(dbenv, &dblp->reginfo);
  625. lsn = lp->lsn;
  626. R_UNLOCK(dbenv, &dblp->reginfo);
  627. rep->stat.st_elections_won++;
  628. return (__rep_send_message(dbenv,
  629.     *eidp, REP_NEWMASTER, &lsn, NULL, 0));
  630. }
  631. MUTEX_LOCK(dbenv, db_rep->mutexp);
  632. /* If we have priority 0, we should never get a vote. */
  633. DB_ASSERT(rep->priority != 0);
  634. if (!IN_ELECTION(rep)) {
  635. #ifdef DIAGNOSTIC
  636. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  637. __db_err(dbenv, "Not in election, got vote");
  638. #endif
  639. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  640. return (DB_REP_HOLDELECTION);
  641. }
  642. /* avoid counting duplicates. */
  643. rep->votes++;
  644. done = rep->votes > rep->nsites / 2;
  645. if (done) {
  646. rep->master_id = rep->eid;
  647. rep->gen = rep->w_gen + 1;
  648. ELECTION_DONE(rep);
  649. F_CLR(rep, REP_F_UPGRADE);
  650. F_SET(rep, REP_F_MASTER);
  651. *eidp = rep->master_id;
  652. #ifdef DIAGNOSTIC
  653. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  654. __db_err(dbenv,
  655. "Got enough votes to win; election done; winner is %d",
  656.     rep->master_id);
  657. #endif
  658. }
  659. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  660. if (done) {
  661. R_LOCK(dbenv, &dblp->reginfo);
  662. lsn = lp->lsn;
  663. R_UNLOCK(dbenv, &dblp->reginfo);
  664. /* Declare me the winner. */
  665. #ifdef DIAGNOSTIC
  666. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  667. __db_err(dbenv, "I won, sending NEWMASTER");
  668. #endif
  669. rep->stat.st_elections_won++;
  670. if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST,
  671.     REP_NEWMASTER, &lsn, NULL, 0)) != 0)
  672. break;
  673. return (DB_REP_NEWMASTER);
  674. }
  675. break;
  676. default:
  677. __db_err(dbenv,
  678. "DB_ENV->rep_process_message: unknown replication message: type %lu",
  679.    (u_long)rp->rectype);
  680. return (EINVAL);
  681. }
  682. return (0);
  683. unlock: MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  684. return (ret);
  685. }
  686. /*
  687.  * __rep_apply --
  688.  *
  689.  * Handle incoming log records on a client, applying when possible and
  690.  * entering into the bookkeeping table otherwise.  This is the guts of
  691.  * the routine that handles the state machine that describes how we
  692.  * process and manage incoming log records.
  693.  */
  694. static int
  695. __rep_apply(dbenv, rp, rec)
  696. DB_ENV *dbenv;
  697. REP_CONTROL *rp;
  698. DBT *rec;
  699. {
  700. __dbreg_register_args dbreg_args;
  701. __txn_ckp_args ckp_args;
  702. DB_REP *db_rep;
  703. DBT control_dbt, key_dbt, lsn_dbt, nextrec_dbt, rec_dbt;
  704. DB *dbp;
  705. DBC *dbc;
  706. DB_LOG *dblp;
  707. DB_LSN ckp_lsn, lsn, newfile_lsn, next_lsn, waiting_lsn;
  708. LOG *lp;
  709. REP *rep;
  710. REP_CONTROL lsn_rc;
  711. u_int32_t rectype, txnid;
  712. int cmp, do_req, eid, have_mutex, ret, t_ret;
  713. db_rep = dbenv->rep_handle;
  714. rep = db_rep->region;
  715. dbp = db_rep->rep_db;
  716. dbc = NULL;
  717. have_mutex = ret = 0;
  718. memset(&control_dbt, 0, sizeof(control_dbt));
  719. memset(&rec_dbt, 0, sizeof(rec_dbt));
  720. /*
  721.  * If this is a log record and it's the next one in line, simply
  722.  * write it to the log.  If it's a "normal" log record, i.e., not
  723.  * a COMMIT or CHECKPOINT or something that needs immediate processing,
  724.  * just return.  If it's a COMMIT, CHECKPOINT or LOG_REGISTER (i.e.,
  725.  * not SIMPLE), handle it now.  If it's a NEWFILE record, then we
  726.  * have to be prepared to deal with a logfile change.
  727.  */
  728. dblp = dbenv->lg_handle;
  729. R_LOCK(dbenv, &dblp->reginfo);
  730. lp = dblp->reginfo.primary;
  731. cmp = log_compare(&rp->lsn, &lp->ready_lsn);
  732. /*
  733.  * This is written to assume that you don't end up with a lot of
  734.  * records after a hole.  That is, it optimizes for the case where
  735.  * there is only a record or two after a hole.  If you have a lot
  736.  * of records after a hole, what you'd really want to do is write
  737.  * all of them and then process all the commits, checkpoints, etc.
  738.  * together.  That is more complicated processing that we can add
  739.  * later if necessary.
  740.  *
  741.  * That said, I really don't want to do db operations holding the
  742.  * log mutex, so the synchronization here is tricky.
  743.  */
  744. if (cmp == 0) {
  745. /* We got the log record that we are expecting. */
  746. if (rp->rectype == REP_NEWFILE) {
  747. newfile: ret = __rep_newfile(dbenv, rp, rec, &lp->ready_lsn);
  748. /* Make this evaluate to a simple rectype. */
  749. rectype = 0;
  750. } else {
  751. DB_ASSERT(log_compare(&rp->lsn, &lp->lsn) == 0);
  752. ret = __log_rep_put(dbenv, &rp->lsn, rec);
  753. lp->ready_lsn = lp->lsn;
  754. memcpy(&rectype, rec->data, sizeof(rectype));
  755. if (ret == 0)
  756. /*
  757.  * We may miscount if we race, since we
  758.  * don't currently hold the rep mutex.
  759.  */
  760. rep->stat.st_log_records++;
  761. }
  762. while (ret == 0 && IS_SIMPLE(rectype) &&
  763.     log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
  764. /*
  765.  * We just filled in a gap in the log record stream.
  766.  * Write subsequent records to the log.
  767.  */
  768. gap_check: lp->wait_recs = 0;
  769. lp->rcvd_recs = 0;
  770. R_UNLOCK(dbenv, &dblp->reginfo);
  771. if (have_mutex == 0) {
  772. MUTEX_LOCK(dbenv, db_rep->db_mutexp);
  773. have_mutex = 1;
  774. }
  775. if (dbc == NULL &&
  776.     (ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
  777. goto err;
  778. /* The DBTs need to persist through another call. */
  779. F_SET(&control_dbt, DB_DBT_REALLOC);
  780. F_SET(&rec_dbt, DB_DBT_REALLOC);
  781. if ((ret = dbc->c_get(dbc,
  782.     &control_dbt, &rec_dbt, DB_RMW | DB_FIRST)) != 0)
  783. goto err;
  784. rp = (REP_CONTROL *)control_dbt.data;
  785. rec = &rec_dbt;
  786. memcpy(&rectype, rec->data, sizeof(rectype));
  787. R_LOCK(dbenv, &dblp->reginfo);
  788. /*
  789.  * We need to check again, because it's possible that
  790.  * some other thread of control changed the waiting_lsn
  791.  * or removed that record from the database.
  792.  */
  793. if (log_compare(&lp->ready_lsn, &rp->lsn) == 0) {
  794. if (rp->rectype != REP_NEWFILE) {
  795. DB_ASSERT(log_compare
  796.     (&rp->lsn, &lp->lsn) == 0);
  797. ret = __log_rep_put(dbenv,
  798.     &rp->lsn, rec);
  799. lp->ready_lsn = lp->lsn;
  800. /*
  801.  * We may miscount if we race, since we
  802.  * don't currently hold the rep mutex.
  803.  */
  804. if (ret == 0)
  805. rep->stat.st_log_records++;
  806. } else {
  807. ret = __rep_newfile(dbenv,
  808.     rp, rec, &lp->ready_lsn);
  809. rectype = 0;
  810. }
  811. waiting_lsn = lp->waiting_lsn;
  812. R_UNLOCK(dbenv, &dblp->reginfo);
  813. if ((ret = dbc->c_del(dbc, 0)) != 0)
  814. goto err;
  815. /*
  816.  * We may miscount, as we don't hold the rep
  817.  * mutex.
  818.  */
  819. --rep->stat.st_log_queued;
  820. /*
  821.  * Update waiting_lsn.  We need to move it
  822.  * forward to the LSN of the next record
  823.  * in the queue.
  824.  */
  825. memset(&lsn_dbt, 0, sizeof(lsn_dbt));
  826. F_SET(&lsn_dbt, DB_DBT_USERMEM);
  827. lsn_dbt.data = &lsn_rc;
  828. lsn_dbt.ulen = sizeof(lsn_rc);
  829. memset(&lsn_rc, 0, sizeof(lsn_rc));
  830. /*
  831.  * If the next item in the database is a log
  832.  * record--the common case--we're not
  833.  * interested in its contents, just in its LSN.
  834.  * If it's a newfile message, though, the
  835.  * data field may be the LSN of the last
  836.  * record in the old file, and we need to use
  837.  * that to determine whether or not there's
  838.  * a gap.
  839.  *
  840.  * Optimize both these cases by doing a partial
  841.  * get of the data item.  If it's a newfile
  842.  * record, we'll get the whole LSN, and if
  843.  * it's not, we won't waste time allocating.
  844.  */
  845. memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
  846. F_SET(&nextrec_dbt,
  847.     DB_DBT_USERMEM | DB_DBT_PARTIAL);
  848. nextrec_dbt.ulen =
  849.     nextrec_dbt.dlen = sizeof(newfile_lsn);
  850. ZERO_LSN(newfile_lsn);
  851. nextrec_dbt.data = &newfile_lsn;
  852. ret = dbc->c_get(dbc,
  853.     &lsn_dbt, &nextrec_dbt, DB_NEXT);
  854. if (ret != DB_NOTFOUND && ret != 0)
  855. goto err;
  856. R_LOCK(dbenv, &dblp->reginfo);
  857. if (ret == DB_NOTFOUND) {
  858. /*
  859.  * Do a quick double-check to make
  860.  * sure waiting_lsn hasn't changed.
  861.  * It's possible that between the
  862.  * DB_NOTFOUND return and the R_LOCK,
  863.  * some record was added to the
  864.  * database, and we don't want to lose
  865.  * sight of the fact that it's there.
  866.  */
  867. if (log_compare(&waiting_lsn,
  868.     &lp->waiting_lsn) == 0)
  869. ZERO_LSN(
  870.     lp->waiting_lsn);
  871. /*
  872.  * Whether or not the current record is
  873.  * simple, there's no next one, and
  874.  * therefore we haven't got anything
  875.  * else to do right now.  Break out.
  876.  */
  877. break;
  878. }
  879. DB_ASSERT(lsn_dbt.size == sizeof(lsn_rc));
  880. /*
  881.  * NEWFILE records have somewhat convoluted
  882.  * semantics, so there are five cases
  883.  * pertaining to what the newly-gotten record
  884.  * is and what we want to do about it.
  885.  *
  886.  * 1) This isn't a NEWFILE record.  Advance
  887.  *    waiting_lsn and proceed.
  888.  *
  889.  * 2) NEWFILE, no LSN stored as the datum,
  890.  *    lsn_rc.lsn == ready_lsn.  The NEWFILE
  891.  *    record is next, so set waiting_lsn =
  892.  *    ready_lsn.
  893.  *
  894.  * 3) NEWFILE, no LSN stored as the datum, but
  895.  *    lsn_rc.lsn > ready_lsn.  There's still a
  896.  *    gap; set waiting_lsn = lsn_rc.lsn.
  897.  *
  898.  * 4) NEWFILE, newfile_lsn in datum, and it's <
  899.  *    ready_lsn. (If the datum is non-empty,
  900.  *    it's the LSN of the last record in a log
  901.  *    file, not the end of the log, and
  902.  *    lsn_rc.lsn is the LSN of the start of
  903.  *    the new file--we didn't have the end of
  904.  *    the old log handy when we sent the
  905.  *    record.)  No gap--we're ready to
  906.  *    proceed.  Set both waiting and ready_lsn
  907.  *    to lsn_rc.lsn.
  908.  *
  909.  * 5) NEWFILE, newfile_lsn in datum, and it's >=
  910.  *    ready_lsn.  We're still missing at
  911.  *    least one record;  set waiting_lsn,
  912.  *    but not ready_lsn, to lsn_rc.lsn.
  913.  */
  914. if (lsn_rc.rectype == REP_NEWFILE &&
  915.     nextrec_dbt.size > 0 && log_compare(
  916.     &newfile_lsn, &lp->ready_lsn) < 0)
  917. /* Case 4. */
  918. lp->ready_lsn =
  919.     lp->waiting_lsn = lsn_rc.lsn;
  920. else {
  921. /* Cases 1, 2, 3, and 5. */
  922. DB_ASSERT(log_compare(&lsn_rc.lsn,
  923.     &lp->ready_lsn) >= 0);
  924. lp->waiting_lsn = lsn_rc.lsn;
  925. }
  926. /*
  927.  * If the current rectype is simple, we're
  928.  * done with it, and we should check and see
  929.  * whether the next record queued is the next
  930.  * one we're ready for.  This is just the loop
  931.  * condition, so we continue.
  932.  *
  933.  * Otherwise, we need to break out of this loop
  934.  * and process this record first.
  935.  */
  936. if (!IS_SIMPLE(rectype))
  937. break;
  938. }
  939. }
  940. /*
  941.  * Check if we're at a gap in the table and if so, whether we
  942.  * need to ask for any records.
  943.  */
  944. do_req = 0;
  945. if (!IS_ZERO_LSN(lp->waiting_lsn) &&
  946.     log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
  947. next_lsn = lp->ready_lsn;
  948. do_req = ++lp->rcvd_recs >= lp->wait_recs;
  949. if (do_req) {
  950. lp->wait_recs = rep->request_gap;
  951. lp->rcvd_recs = 0;
  952. }
  953. }
  954. R_UNLOCK(dbenv, &dblp->reginfo);
  955. if (dbc != NULL) {
  956. if ((ret = dbc->c_close(dbc)) != 0)
  957. goto err;
  958. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  959. have_mutex = 0;
  960. }
  961. dbc = NULL;
  962. if (do_req) {
  963. MUTEX_LOCK(dbenv, db_rep->mutexp);
  964. eid = db_rep->region->master_id;
  965. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  966. if (eid != DB_EID_INVALID) {
  967. rep->stat.st_log_requested++;
  968. if ((ret = __rep_send_message(dbenv,
  969.     eid, REP_LOG_REQ, &next_lsn, NULL, 0)) != 0)
  970. goto err;
  971. }
  972. }
  973. } else if (cmp > 0) {
  974. /*
  975.  * The LSN is higher than the one we were waiting for.
  976.  * If it is a NEWFILE message, this may not mean that
  977.  * there's a gap;  in some cases, NEWFILE messages contain
  978.  * the LSN of the beginning of the new file instead
  979.  * of the end of the old.
  980.  *
  981.  * In these cases, the rec DBT will contain the last LSN
  982.  * of the old file, so we can tell whether there's a gap.
  983.  */
  984. if (rp->rectype == REP_NEWFILE &&
  985.     rp->lsn.file == lp->ready_lsn.file + 1 &&
  986.     rp->lsn.offset == 0) {
  987. DB_ASSERT(rec != NULL && rec->data != NULL &&
  988.     rec->size == sizeof(DB_LSN));
  989. memcpy(&lsn, rec->data, sizeof(DB_LSN));
  990. if (log_compare(&lp->ready_lsn, &lsn) > 0)
  991. /*
  992.  * The last LSN in the old file is smaller
  993.  * than the one we're expecting, so there's
  994.  * no gap--the one we're expecting just
  995.  * doesn't exist.
  996.  */
  997. goto newfile;
  998. }
  999. /*
  1000.  * This record isn't in sequence; add it to the table and
  1001.  * update waiting_lsn if necessary.
  1002.  */
  1003. memset(&key_dbt, 0, sizeof(key_dbt));
  1004. key_dbt.data = rp;
  1005. key_dbt.size = sizeof(*rp);
  1006. next_lsn = lp->lsn;
  1007. do_req = 0;
  1008. if (lp->wait_recs == 0) {
  1009. /*
  1010.  * This is a new gap. Initialize the number of
  1011.  * records that we should wait before requesting
  1012.  * that it be resent.  We grab the limits out of
  1013.  * the rep without the mutex.
  1014.  */
  1015. lp->wait_recs = rep->request_gap;
  1016. lp->rcvd_recs = 0;
  1017. }
  1018. if (++lp->rcvd_recs >= lp->wait_recs) {
  1019. /*
  1020.  * If we've waited long enough, request the record
  1021.  * and double the wait interval.
  1022.  */
  1023. do_req = 1;
  1024. lp->wait_recs <<= 1;
  1025. lp->rcvd_recs = 0;
  1026. if (lp->wait_recs > rep->max_gap)
  1027. lp->wait_recs = rep->max_gap;
  1028. }
  1029. R_UNLOCK(dbenv, &dblp->reginfo);
  1030. MUTEX_LOCK(dbenv, db_rep->db_mutexp);
  1031. ret = dbp->put(dbp, NULL, &key_dbt, rec, 0);
  1032. rep->stat.st_log_queued++;
  1033. rep->stat.st_log_queued_total++;
  1034. if (rep->stat.st_log_queued_max < rep->stat.st_log_queued)
  1035. rep->stat.st_log_queued_max = rep->stat.st_log_queued;
  1036. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  1037. if (ret != 0)
  1038. return (ret);
  1039. R_LOCK(dbenv, &dblp->reginfo);
  1040. if (IS_ZERO_LSN(lp->waiting_lsn) ||
  1041.     log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
  1042. lp->waiting_lsn = rp->lsn;
  1043. R_UNLOCK(dbenv, &dblp->reginfo);
  1044. if (do_req) {
  1045. /* Request the LSN we are still waiting for. */
  1046. MUTEX_LOCK(dbenv, db_rep->mutexp);
  1047. /* May as well do this after we grab the mutex. */
  1048. eid = db_rep->region->master_id;
  1049. /*
  1050.  * If the master_id is invalid, this means that since
  1051.  * the last record was sent, somebody declared an
  1052.  * election and we may not have a master to request
  1053.  * things of.
  1054.  *
  1055.  * This is not an error;  when we find a new master,
  1056.  * we'll re-negotiate where the end of the log is and
  1057.  * try to to bring ourselves up to date again anyway.
  1058.  */
  1059. if (eid != DB_EID_INVALID) {
  1060. rep->stat.st_log_requested++;
  1061. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  1062. ret = __rep_send_message(dbenv,
  1063.     eid, REP_LOG_REQ, &next_lsn, NULL, 0);
  1064. } else
  1065. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  1066. }
  1067. return (ret);
  1068. } else {
  1069. R_UNLOCK(dbenv, &dblp->reginfo);
  1070. /*
  1071.  * We may miscount if we race, since we
  1072.  * don't currently hold the rep mutex.
  1073.  */
  1074. rep->stat.st_log_duplicated++;
  1075. }
  1076. if (ret != 0 || cmp < 0 || (cmp == 0 && IS_SIMPLE(rectype)))
  1077. goto done;
  1078. /*
  1079.  * If we got here, then we've got a log record in rp and rec that
  1080.  * we need to process.
  1081.  */
  1082. switch(rectype) {
  1083. case DB___dbreg_register:
  1084. /*
  1085.  * DB opens occur in the context of a transaction, so we can
  1086.  * simply handle them when we process the transaction.  Closes,
  1087.  * however, are not transaction-protected, so we have to
  1088.  * handle them here.
  1089.  *
  1090.  * Note that it should be unsafe for the master to do a close
  1091.  * of a file that was opened in an active transaction, so we
  1092.  * should be guaranteed to get the ordering right.
  1093.  */
  1094. memcpy(&txnid, (u_int8_t *)rec->data +
  1095.     ((u_int8_t *)&dbreg_args.txnid - (u_int8_t *)&dbreg_args),
  1096.     sizeof(u_int32_t));
  1097. if (txnid == TXN_INVALID &&
  1098.     !F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
  1099. ret = __db_dispatch(dbenv, dbenv->recover_dtab,
  1100.     dbenv->recover_dtab_size, rec, &rp->lsn,
  1101.     DB_TXN_APPLY, NULL);
  1102. break;
  1103. case DB___txn_ckp:
  1104. /* Sync the memory pool. */
  1105. memcpy(&ckp_lsn, (u_int8_t *)rec->data +
  1106.     ((u_int8_t *)&ckp_args.ckp_lsn - (u_int8_t *)&ckp_args),
  1107.     sizeof(DB_LSN));
  1108. if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
  1109. ret = dbenv->memp_sync(dbenv, &ckp_lsn);
  1110. else
  1111. /*
  1112.  * We ought to make sure the logs on a logs-only
  1113.  * replica get flushed now and again.
  1114.  */
  1115. ret = dbenv->log_flush(dbenv, &ckp_lsn);
  1116. /* Update the last_ckp in the txn region. */
  1117. if (ret == 0)
  1118. __txn_updateckp(dbenv, &rp->lsn);
  1119. break;
  1120. case DB___txn_regop:
  1121. if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
  1122. do {
  1123. /*
  1124.  * If an application is doing app-specific
  1125.  * recovery and acquires locks while applying
  1126.  * a transaction, it can deadlock.  Any other
  1127.  * locks held by this thread should have been
  1128.  * discarded in the __rep_process_txn error
  1129.  * path, so if we simply retry, we should
  1130.  * eventually succeed.
  1131.  */
  1132. ret = __rep_process_txn(dbenv, rec);
  1133. } while (ret == DB_LOCK_DEADLOCK);
  1134. break;
  1135. default:
  1136. goto err;
  1137. }
  1138. /* Check if we need to go back into the table. */
  1139. if (ret == 0) {
  1140. R_LOCK(dbenv, &dblp->reginfo);
  1141. if (log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
  1142. goto gap_check;
  1143. R_UNLOCK(dbenv, &dblp->reginfo);
  1144. }
  1145. done:
  1146. err: if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
  1147. ret = t_ret;
  1148. if (have_mutex)
  1149. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  1150. if (control_dbt.data != NULL)
  1151. __os_ufree(dbenv, control_dbt.data);
  1152. if (rec_dbt.data != NULL)
  1153. __os_ufree(dbenv, rec_dbt.data);
  1154. return (ret);
  1155. }
  1156. /*
  1157.  * __rep_process_txn --
  1158.  *
  1159.  * This is the routine that actually gets a transaction ready for
  1160.  * processing.
  1161.  *
  1162.  * PUBLIC: int __rep_process_txn __P((DB_ENV *, DBT *));
  1163.  */
  1164. int
  1165. __rep_process_txn(dbenv, rec)
  1166. DB_ENV *dbenv;
  1167. DBT *rec;
  1168. {
  1169. DBT data_dbt;
  1170. DB_LOCKREQ req, *lvp;
  1171. DB_LOGC *logc;
  1172. DB_LSN prev_lsn, *lsnp;
  1173. DB_REP *db_rep;
  1174. LSN_COLLECTION lc;
  1175. REP *rep;
  1176. __txn_regop_args *txn_args;
  1177. __txn_xa_regop_args *prep_args;
  1178. u_int32_t lockid, op, rectype;
  1179. int i, ret, t_ret;
  1180. int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
  1181. size_t dtabsize;
  1182. void *txninfo;
  1183. db_rep = dbenv->rep_handle;
  1184. rep = db_rep->region;
  1185. logc = NULL;
  1186. txninfo = NULL;
  1187. memset(&data_dbt, 0, sizeof(data_dbt));
  1188. if (F_ISSET(dbenv, DB_ENV_THREAD))
  1189. F_SET(&data_dbt, DB_DBT_REALLOC);
  1190. /*
  1191.  * There are two phases:  First, we have to traverse
  1192.  * backwards through the log records gathering the list
  1193.  * of all LSNs in the transaction.  Once we have this information,
  1194.  * we can loop through, acquire the locks we need for each record,
  1195.  * and then apply it.
  1196.  */
  1197. dtab = NULL;
  1198. /*
  1199.  * We may be passed a prepare (if we're restoring a prepare
  1200.  * on upgrade) instead of a commit (the common case).
  1201.  * Check which and behave appropriately.
  1202.  */
  1203. memcpy(&rectype, rec->data, sizeof(rectype));
  1204. memset(&lc, 0, sizeof(lc));
  1205. if (rectype == DB___txn_regop) {
  1206. /*
  1207.  * We're the end of a transaction.  Make sure this is
  1208.  * really a commit and not an abort!
  1209.  */
  1210. if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0)
  1211. return (ret);
  1212. op = txn_args->opcode;
  1213. prev_lsn = txn_args->prev_lsn;
  1214. __os_free(dbenv, txn_args);
  1215. if (op != TXN_COMMIT)
  1216. return (0);
  1217. } else {
  1218. /* We're a prepare. */
  1219. DB_ASSERT(rectype == DB___txn_xa_regop);
  1220. if ((ret =
  1221.     __txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0)
  1222. return (ret);
  1223. prev_lsn = prep_args->prev_lsn;
  1224. __os_free(dbenv, prep_args);
  1225. }
  1226. /* Phase 1.  Get a list of the LSNs in this transaction, and sort it. */
  1227. if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0)
  1228. return (ret);
  1229. qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
  1230. if ((ret = dbenv->lock_id(dbenv, &lockid)) != 0)
  1231. goto err;
  1232. /* Initialize the getpgno dispatch table. */
  1233. if ((ret = __rep_lockpgno_init(dbenv, &dtab, &dtabsize)) != 0)
  1234. goto err;
  1235. /*
  1236.  * The set of records for a transaction may include dbreg_register
  1237.  * records.  Create a txnlist so that they can keep track of file
  1238.  * state between records.
  1239.  */
  1240. if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0)
  1241. goto err;
  1242. /* Phase 2: Apply updates. */
  1243. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  1244. goto err;
  1245. for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
  1246. if ((ret = __rep_lockpages(dbenv,
  1247.     dtab, dtabsize, lsnp, NULL, NULL, lockid)) != 0)
  1248. goto err;
  1249. if ((ret = logc->get(logc, lsnp, &data_dbt, DB_SET)) != 0)
  1250. goto err;
  1251. if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab,
  1252.     dbenv->recover_dtab_size, &data_dbt, lsnp,
  1253.     DB_TXN_APPLY, txninfo)) != 0)
  1254. goto err;
  1255. }
  1256. err: memset(&req, 0, sizeof(req));
  1257. req.op = DB_LOCK_PUT_ALL;
  1258. if ((t_ret = dbenv->lock_vec(dbenv, lockid,
  1259.     DB_LOCK_FREE_LOCKER, &req, 1, &lvp)) != 0 && ret == 0)
  1260. ret = t_ret;
  1261. if (lc.nalloc != 0)
  1262. __os_free(dbenv, lc.array);
  1263. if ((t_ret =
  1264.     dbenv->lock_id_free(dbenv, lockid)) != 0 && ret == 0)
  1265. ret = t_ret;
  1266. if (logc != NULL && (t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  1267. ret = t_ret;
  1268. if (txninfo != NULL)
  1269. __db_txnlist_end(dbenv, txninfo);
  1270. if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
  1271. __os_ufree(dbenv, data_dbt.data);
  1272. if (dtab != NULL)
  1273. __os_free(dbenv, dtab);
  1274. if (ret == 0)
  1275. /*
  1276.  * We don't hold the rep mutex, and could miscount if we race.
  1277.  */
  1278. rep->stat.st_txns_applied++;
  1279. return (ret);
  1280. }
  1281. /*
  1282.  * __rep_collect_txn
  1283.  * Recursive function that will let us visit every entry in a transaction
  1284.  * chain including all child transactions so that we can then apply
  1285.  * the entire transaction family at once.
  1286.  */
  1287. static int
  1288. __rep_collect_txn(dbenv, lsnp, lc)
  1289. DB_ENV *dbenv;
  1290. DB_LSN *lsnp;
  1291. LSN_COLLECTION *lc;
  1292. {
  1293. __txn_child_args *argp;
  1294. DB_LOGC *logc;
  1295. DB_LSN c_lsn;
  1296. DBT data;
  1297. u_int32_t rectype;
  1298. int nalloc, ret, t_ret;
  1299. memset(&data, 0, sizeof(data));
  1300. F_SET(&data, DB_DBT_REALLOC);
  1301. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  1302. return (ret);
  1303. while (!IS_ZERO_LSN(*lsnp) &&
  1304.     (ret = logc->get(logc, lsnp, &data, DB_SET)) == 0) {
  1305. memcpy(&rectype, data.data, sizeof(rectype));
  1306. if (rectype == DB___txn_child) {
  1307. if ((ret = __txn_child_read(dbenv,
  1308.     data.data, &argp)) != 0)
  1309. goto err;
  1310. c_lsn = argp->c_lsn;
  1311. *lsnp = argp->prev_lsn;
  1312. __os_free(dbenv, argp);
  1313. ret = __rep_collect_txn(dbenv, &c_lsn, lc);
  1314. } else {
  1315. if (lc->nalloc < lc->nlsns + 1) {
  1316. nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
  1317. if ((ret = __os_realloc(dbenv,
  1318.     nalloc * sizeof(DB_LSN), &lc->array)) != 0)
  1319. goto err;
  1320. lc->nalloc = nalloc;
  1321. }
  1322. lc->array[lc->nlsns++] = *lsnp;
  1323. /*
  1324.  * Explicitly copy the previous lsn.  The record
  1325.  * starts with a u_int32_t record type, a u_int32_t
  1326.  * txn id, and then the DB_LSN (prev_lsn) that we
  1327.  * want.  We copy explicitly because we have no idea
  1328.  * what kind of record this is.
  1329.  */
  1330. memcpy(lsnp, (u_int8_t *)data.data +
  1331.     sizeof(u_int32_t) + sizeof(u_int32_t),
  1332.     sizeof(DB_LSN));
  1333. }
  1334. if (ret != 0)
  1335. goto err;
  1336. }
  1337. err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  1338. ret = t_ret;
  1339. if (data.data != NULL)
  1340. __os_ufree(dbenv, data.data);
  1341. return (ret);
  1342. }
  1343. /*
  1344.  * __rep_lsn_cmp --
  1345.  * qsort-type-compatible wrapper for log_compare.
  1346.  */
  1347. static int
  1348. __rep_lsn_cmp(lsn1, lsn2)
  1349. const void *lsn1, *lsn2;
  1350. {
  1351. return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));
  1352. }
  1353. /*
  1354.  * __rep_newfile --
  1355.  * NEWFILE messages can contain either the last LSN of the old file
  1356.  * or the first LSN of the new one, depending on which we have available
  1357.  * when the message is sent.  When applying a NEWFILE message, make sure
  1358.  * we haven't already swapped files, as it's possible (given the right sequence
  1359.  * of out-of-order messages) to wind up with a NEWFILE message of each
  1360.  * variety, and __rep_apply won't detect the two as duplicates of each other.
  1361.  */
  1362. static int
  1363. __rep_newfile(dbenv, rc, msgdbt, lsnp)
  1364. DB_ENV *dbenv;
  1365. REP_CONTROL *rc;
  1366. DBT *msgdbt;
  1367. DB_LSN *lsnp;
  1368. {
  1369. DB_LOG *dblp;
  1370. LOG *lp;
  1371. u_int32_t newfile;
  1372. dblp = dbenv->lg_handle;
  1373. lp = dblp->reginfo.primary;
  1374. /*
  1375.  * A NEWFILE message containing the old file's LSN will be
  1376.  * accompanied by a NULL rec DBT;  one containing the new one's LSN
  1377.  * will need to supply the last record in the old file by
  1378.  * sending it in the rec DBT.
  1379.  */
  1380. if (msgdbt == NULL || msgdbt->size == 0)
  1381. newfile = rc->lsn.file + 1;
  1382. else
  1383. newfile = rc->lsn.file;
  1384. if (newfile > lp->lsn.file)
  1385. return (__log_newfile(dblp, lsnp));
  1386. else {
  1387. /* We've already applied this NEWFILE.  Just ignore it. */
  1388. *lsnp = lp->lsn;
  1389. return (0);
  1390. }
  1391. }