rep_method.c
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:29k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /*-
  2.  * See the file LICENSE for redistribution information.
  3.  *
  4.  * Copyright (c) 2001-2002
  5.  * Sleepycat Software.  All rights reserved.
  6.  */
  7. #include "db_config.h"
  8. #ifndef lint
  9. static const char revid[] = "$Id: rep_method.c,v 1.78 2002/09/10 12:58:07 bostic Exp $";
  10. #endif /* not lint */
  11. #ifndef NO_SYSTEM_INCLUDES
  12. #include <sys/types.h>
  13. #ifdef HAVE_RPC
  14. #include <rpc/rpc.h>
  15. #endif
  16. #include <stdlib.h>
  17. #include <string.h>
  18. #include <unistd.h>
  19. #endif
  20. #include "db_int.h"
  21. #include "dbinc/db_page.h"
  22. #include "dbinc/db_am.h"
  23. #include "dbinc/log.h"
  24. #include "dbinc/rep.h"
  25. #include "dbinc/txn.h"
  26. #ifdef HAVE_RPC
  27. #include "dbinc_auto/db_server.h"
  28. #include "dbinc_auto/rpc_client_ext.h"
  29. #endif
  30. static int __rep_abort_prepared __P((DB_ENV *));
  31. static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
  32. static int __rep_client_dbinit __P((DB_ENV *, int));
  33. static int __rep_elect __P((DB_ENV *, int, int, u_int32_t, int *));
  34. static int __rep_elect_init __P((DB_ENV *, DB_LSN *, int, int, int, int *));
  35. static int __rep_flush __P((DB_ENV *));
  36. static int __rep_restore_prepared __P((DB_ENV *));
  37. static int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
  38. static int __rep_set_request __P((DB_ENV *, u_int32_t, u_int32_t));
  39. static int __rep_set_rep_transport __P((DB_ENV *, int,
  40.     int (*)(DB_ENV *, const DBT *, const DBT *, int, u_int32_t)));
  41. static int __rep_start __P((DB_ENV *, DBT *, u_int32_t));
  42. static int __rep_stat __P((DB_ENV *, DB_REP_STAT **, u_int32_t));
  43. static int __rep_wait __P((DB_ENV *, u_int32_t, int *, u_int32_t));
  44. /*
  45.  * __rep_dbenv_create --
  46.  * Replication-specific initialization of the DB_ENV structure.
  47.  *
  48.  * PUBLIC: int __rep_dbenv_create __P((DB_ENV *));
  49.  */
  50. int
  51. __rep_dbenv_create(dbenv)
  52. DB_ENV *dbenv;
  53. {
  54. DB_REP *db_rep;
  55. int ret;
  56. #ifdef HAVE_RPC
  57. if (F_ISSET(dbenv, DB_ENV_RPCCLIENT)) {
  58. COMPQUIET(db_rep, NULL);
  59. COMPQUIET(ret, 0);
  60. dbenv->rep_elect = __dbcl_rep_elect;
  61. dbenv->rep_flush = __dbcl_rep_flush;
  62. dbenv->rep_process_message = __dbcl_rep_process_message;
  63. dbenv->rep_start = __dbcl_rep_start;
  64. dbenv->rep_stat = __dbcl_rep_stat;
  65. dbenv->set_rep_limit = __dbcl_rep_set_limit;
  66. dbenv->set_rep_request = __dbcl_rep_set_request;
  67. dbenv->set_rep_transport = __dbcl_rep_set_rep_transport;
  68. } else
  69. #endif
  70. {
  71. dbenv->rep_elect = __rep_elect;
  72. dbenv->rep_flush = __rep_flush;
  73. dbenv->rep_process_message = __rep_process_message;
  74. dbenv->rep_start = __rep_start;
  75. dbenv->rep_stat = __rep_stat;
  76. dbenv->set_rep_limit = __rep_set_limit;
  77. dbenv->set_rep_request = __rep_set_request;
  78. dbenv->set_rep_transport = __rep_set_rep_transport;
  79. /*
  80.  * !!!
  81.  * Our caller has not yet had the opportunity to reset the panic
  82.  * state or turn off mutex locking, and so we can neither check
  83.  * the panic state or acquire a mutex in the DB_ENV create path.
  84.  */
  85. if ((ret = __os_calloc(dbenv, 1, sizeof(DB_REP), &db_rep)) != 0)
  86. return (ret);
  87. dbenv->rep_handle = db_rep;
  88. /* Initialize the per-process replication structure. */
  89. db_rep->rep_send = NULL;
  90. }
  91. return (0);
  92. }
  93. /*
  94.  * __rep_start --
  95.  * Become a master or client, and start sending messages to participate
  96.  * in the replication environment.  Must be called after the environment
  97.  * is open.
  98.  */
  99. static int
  100. __rep_start(dbenv, dbt, flags)
  101. DB_ENV *dbenv;
  102. DBT *dbt;
  103. u_int32_t flags;
  104. {
  105. DB_LOG *dblp;
  106. DB_LSN lsn;
  107. DB_REP *db_rep;
  108. REP *rep;
  109. int announce, init_db, redo_prepared, ret;
  110. PANIC_CHECK(dbenv);
  111. ENV_ILLEGAL_BEFORE_OPEN(dbenv, "rep_start");
  112. ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
  113. db_rep = dbenv->rep_handle;
  114. rep = db_rep->region;
  115. if ((ret = __db_fchk(dbenv, "DB_ENV->rep_start", flags,
  116.     DB_REP_CLIENT | DB_REP_LOGSONLY | DB_REP_MASTER)) != 0)
  117. return (ret);
  118. /* Exactly one of CLIENT and MASTER must be specified. */
  119. if ((ret = __db_fcchk(dbenv,
  120.     "DB_ENV->rep_start", flags, DB_REP_CLIENT, DB_REP_MASTER)) != 0)
  121. return (ret);
  122. if (!LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER | DB_REP_LOGSONLY)) {
  123. __db_err(dbenv,
  124. "DB_ENV->rep_start: replication mode must be specified");
  125. return (EINVAL);
  126. }
  127. /* Masters can't be logs-only. */
  128. if ((ret = __db_fcchk(dbenv,
  129.     "DB_ENV->rep_start", flags, DB_REP_LOGSONLY, DB_REP_MASTER)) != 0)
  130. return (ret);
  131. /* We need a transport function. */
  132. if (db_rep->rep_send == NULL) {
  133. __db_err(dbenv,
  134.     "DB_ENV->set_rep_transport must be called before DB_ENV->rep_start");
  135. return (EINVAL);
  136. }
  137. /* We'd better not have any logged files open if we are a client. */
  138. if (LF_ISSET(DB_REP_CLIENT) && (ret = __dbreg_nofiles(dbenv)) != 0) {
  139. __db_err(dbenv, "DB_ENV->rep_start called with open files");
  140. return (ret);
  141. }
  142. MUTEX_LOCK(dbenv, db_rep->mutexp);
  143. if (rep->eid == DB_EID_INVALID)
  144. rep->eid = dbenv->rep_eid;
  145. if (LF_ISSET(DB_REP_MASTER)) {
  146. if (F_ISSET(dbenv, DB_ENV_REP_CLIENT)) {
  147. /*
  148.  * If we're upgrading from having been a client,
  149.  * preclose, so that we close our temporary database.
  150.  *
  151.  * Do not close files that we may have opened while
  152.  * doing a rep_apply;  they'll get closed when we
  153.  * finally close the environment, but for now, leave
  154.  * them open, as we don't want to recycle their
  155.  * fileids, and we may need the handles again if
  156.  * we become a client and the original master
  157.  * that opened them becomes a master again.
  158.  */
  159. if ((ret = __rep_preclose(dbenv, 0)) != 0)
  160. return (ret);
  161. /*
  162.  * Now write a __txn_recycle record so that
  163.  * clients don't get confused with our txnids
  164.  * and txnids of previous masters.
  165.  */
  166. F_CLR(dbenv, DB_ENV_REP_CLIENT);
  167. if ((ret = __txn_reset(dbenv)) != 0)
  168. return (ret);
  169. }
  170. redo_prepared = 0;
  171. if (!F_ISSET(rep, REP_F_MASTER)) {
  172. /* Master is not yet set. */
  173. if (F_ISSET(rep, REP_ISCLIENT)) {
  174. F_CLR(rep, REP_ISCLIENT);
  175. rep->gen = ++rep->w_gen;
  176. redo_prepared = 1;
  177. } else if (rep->gen == 0)
  178. rep->gen = 1;
  179. }
  180. F_SET(rep, REP_F_MASTER);
  181. F_SET(dbenv, DB_ENV_REP_MASTER);
  182. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  183. dblp = (DB_LOG *)dbenv->lg_handle;
  184. R_LOCK(dbenv, &dblp->reginfo);
  185. lsn = ((LOG *)dblp->reginfo.primary)->lsn;
  186. R_UNLOCK(dbenv, &dblp->reginfo);
  187. /*
  188.  * Send the NEWMASTER message, then restore prepared txns
  189.  * if and only if we just upgraded from being a client.
  190.  */
  191. if ((ret = __rep_send_message(dbenv,
  192.     DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0)) == 0 &&
  193.     redo_prepared)
  194. ret = __rep_restore_prepared(dbenv);
  195. } else {
  196. F_CLR(dbenv, DB_ENV_REP_MASTER);
  197. F_SET(dbenv, DB_ENV_REP_CLIENT);
  198. if (LF_ISSET(DB_REP_LOGSONLY))
  199. F_SET(dbenv, DB_ENV_REP_LOGSONLY);
  200. announce = !F_ISSET(rep, REP_ISCLIENT) ||
  201.     rep->master_id == DB_EID_INVALID;
  202. init_db = 0;
  203. if (!F_ISSET(rep, REP_ISCLIENT)) {
  204. F_CLR(rep, REP_F_MASTER);
  205. if (LF_ISSET(DB_REP_LOGSONLY))
  206. F_SET(rep, REP_F_LOGSONLY);
  207. else
  208. F_SET(rep, REP_F_UPGRADE);
  209. /*
  210.  * We initialize the client's generation number to 0.
  211.  * Upon startup, it looks for a master and updates the
  212.  * generation number as necessary, exactly as it does
  213.  * during normal operation and a master failure.
  214.  */
  215. rep->gen = 0;
  216. rep->master_id = DB_EID_INVALID;
  217. init_db = 1;
  218. }
  219. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  220. /*
  221.  * Abort any prepared transactions that were restored
  222.  * by recovery.  We won't be able to create any txns of
  223.  * our own until they're resolved, but we can't resolve
  224.  * them ourselves;  the master has to.  If any get
  225.  * resolved as commits, we'll redo them when commit
  226.  * records come in.  Aborts will simply be ignored.
  227.  */
  228. if ((ret = __rep_abort_prepared(dbenv)) != 0)
  229. return (ret);
  230. if ((ret = __rep_client_dbinit(dbenv, init_db)) != 0)
  231. return (ret);
  232. /*
  233.  * If this client created a newly replicated environment,
  234.  * then announce the existence of this client.  The master
  235.  * should respond with a message that will tell this client
  236.  * the current generation number and the current LSN.  This
  237.  * will allow the client to either perform recovery or
  238.  * simply join in.
  239.  */
  240. if (announce)
  241. ret = __rep_send_message(dbenv,
  242.     DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0);
  243. }
  244. return (ret);
  245. }
  246. /*
  247.  * __rep_client_dbinit --
  248.  *
  249.  * Initialize the LSN database on the client side.  This is called from the
  250.  * client initialization code.  The startup flag value indicates if
  251.  * this is the first thread/process starting up and therefore should create
  252.  * the LSN database.  This routine must be called once by each process acting
  253.  * as a client.
  254.  */
  255. static int
  256. __rep_client_dbinit(dbenv, startup)
  257. DB_ENV *dbenv;
  258. int startup;
  259. {
  260. DB_REP *db_rep;
  261. DB *dbp;
  262. int ret, t_ret;
  263. u_int32_t flags;
  264. PANIC_CHECK(dbenv);
  265. db_rep = dbenv->rep_handle;
  266. dbp = NULL;
  267. #define REPDBNAME "__db.rep.db"
  268. /* Check if this has already been called on this environment. */
  269. if (db_rep->rep_db != NULL)
  270. return (0);
  271. MUTEX_LOCK(dbenv, db_rep->db_mutexp);
  272. if (startup) {
  273. if ((ret = db_create(&dbp, dbenv, 0)) != 0)
  274. goto err;
  275. /*
  276.  * Ignore errors, because if the file doesn't exist, this
  277.  * is perfectly OK.
  278.  */
  279. (void)dbp->remove(dbp, REPDBNAME, NULL, 0);
  280. }
  281. if ((ret = db_create(&dbp, dbenv, 0)) != 0)
  282. goto err;
  283. if ((ret = dbp->set_bt_compare(dbp, __rep_bt_cmp)) != 0)
  284. goto err;
  285. /* Allow writes to this database on a client. */
  286. F_SET(dbp, DB_AM_CL_WRITER);
  287. flags = (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0) |
  288.     (startup ? DB_CREATE : 0);
  289. if ((ret = dbp->open(dbp, NULL,
  290.     "__db.rep.db", NULL, DB_BTREE, flags, 0)) != 0)
  291. goto err;
  292. db_rep->rep_db = dbp;
  293. if (0) {
  294. err: if (dbp != NULL &&
  295.     (t_ret = dbp->close(dbp, DB_NOSYNC)) != 0 && ret == 0)
  296. ret = t_ret;
  297. db_rep->rep_db = NULL;
  298. }
  299. MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
  300. return (ret);
  301. }
  302. /*
  303.  * __rep_bt_cmp --
  304.  *
  305.  * Comparison function for the LSN table.  We use the entire control
  306.  * structure as a key (for simplicity, so we don't have to merge the
  307.  * other fields in the control with the data field), but really only
  308.  * care about the LSNs.
  309.  */
  310. static int
  311. __rep_bt_cmp(dbp, dbt1, dbt2)
  312. DB *dbp;
  313. const DBT *dbt1, *dbt2;
  314. {
  315. DB_LSN lsn1, lsn2;
  316. REP_CONTROL *rp1, *rp2;
  317. COMPQUIET(dbp, NULL);
  318. rp1 = dbt1->data;
  319. rp2 = dbt2->data;
  320. __ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN));
  321. __ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN));
  322. if (lsn1.file > lsn2.file)
  323. return (1);
  324. if (lsn1.file < lsn2.file)
  325. return (-1);
  326. if (lsn1.offset > lsn2.offset)
  327. return (1);
  328. if (lsn1.offset < lsn2.offset)
  329. return (-1);
  330. return (0);
  331. }
  332. /*
  333.  * __rep_abort_prepared --
  334.  * Abort any prepared transactions that recovery restored.
  335.  *
  336.  * This is used by clients that have just run recovery, since
  337.  * they cannot/should not call txn_recover and handle prepared transactions
  338.  * themselves.
  339.  */
  340. static int
  341. __rep_abort_prepared(dbenv)
  342. DB_ENV *dbenv;
  343. {
  344. #define PREPLISTSIZE 50
  345. DB_PREPLIST prep[PREPLISTSIZE], *p;
  346. DB_TXNMGR *mgr;
  347. DB_TXNREGION *region;
  348. int do_aborts, ret;
  349. long count, i;
  350. u_int32_t op;
  351. mgr = dbenv->tx_handle;
  352. region = mgr->reginfo.primary;
  353. do_aborts = 0;
  354. R_LOCK(dbenv, &mgr->reginfo);
  355. if (region->stat.st_nrestores != 0)
  356. do_aborts = 1;
  357. R_UNLOCK(dbenv, &mgr->reginfo);
  358. if (do_aborts) {
  359. op = DB_FIRST;
  360. do {
  361. if ((ret = dbenv->txn_recover(dbenv,
  362.     prep, PREPLISTSIZE, &count, op)) != 0)
  363. return (ret);
  364. for (i = 0; i < count; i++) {
  365. p = &prep[i];
  366. if ((ret = p->txn->abort(p->txn)) != 0)
  367. return (ret);
  368. }
  369. op = DB_NEXT;
  370. } while (count == PREPLISTSIZE);
  371. }
  372. return (0);
  373. }
  374. /*
  375.  * __rep_restore_prepared --
  376.  * Restore to a prepared state any prepared but not yet committed
  377.  * transactions.
  378.  *
  379.  * This performs, in effect, a "mini-recovery";  it is called from
  380.  * __rep_start by newly upgraded masters.  There may be transactions that an
  381.  * old master prepared but did not resolve, which we need to restore to an
  382.  * active state.
  383.  */
  384. static int
  385. __rep_restore_prepared(dbenv)
  386. DB_ENV *dbenv;
  387. {
  388. DB_LOGC *logc;
  389. DB_LSN ckp_lsn, lsn;
  390. DBT rec;
  391. __txn_ckp_args *ckp_args;
  392. __txn_regop_args *regop_args;
  393. __txn_xa_regop_args *prep_args;
  394. int ret, t_ret;
  395. u_int32_t hi_txn, low_txn, rectype;
  396. void *txninfo;
  397. txninfo = NULL;
  398. ckp_args = NULL;
  399. prep_args = NULL;
  400. regop_args = NULL;
  401. ZERO_LSN(ckp_lsn);
  402. ZERO_LSN(lsn);
  403. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  404. return (ret);
  405. /*
  406.  * We need to consider the set of records between the most recent
  407.  * checkpoint LSN and the end of the log;  any txn in that
  408.  * range, and only txns in that range, could still have been
  409.  * active, and thus prepared but not yet committed (PBNYC),
  410.  * when the old master died.
  411.  *
  412.  * Find the most recent checkpoint LSN, and get the record there.
  413.  * If there is no checkpoint in the log, start off by getting
  414.  * the very first record in the log instead.
  415.  */
  416. memset(&rec, 0, sizeof(DBT));
  417. if ((ret = __txn_getckp(dbenv, &lsn)) == 0) {
  418. if ((ret = logc->get(logc, &lsn, &rec, DB_SET)) != 0)  {
  419. __db_err(dbenv,
  420.     "Checkpoint record at LSN [%lu][%lu] not found",
  421.     (u_long)lsn.file, (u_long)lsn.offset);
  422. goto err;
  423. }
  424. if ((ret = __txn_ckp_read(dbenv, rec.data, &ckp_args)) != 0) {
  425. __db_err(dbenv,
  426.     "Invalid checkpoint record at [%lu][%lu]",
  427.     (u_long)lsn.file, (u_long)lsn.offset);
  428. goto err;
  429. }
  430. ckp_lsn = ckp_args->ckp_lsn;
  431. __os_free(dbenv, ckp_args);
  432. if ((ret = logc->get(logc, &ckp_lsn, &rec, DB_SET)) != 0) {
  433. __db_err(dbenv,
  434.     "Checkpoint LSN record [%lu][%lu] not found",
  435.     (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
  436. goto err;
  437. }
  438. } else if ((ret = logc->get(logc, &lsn, &rec, DB_FIRST)) != 0) {
  439. if (ret == DB_NOTFOUND) {
  440. /* An empty log means no PBNYC txns. */
  441. ret = 0;
  442. goto done;
  443. }
  444. __db_err(dbenv, "Attempt to get first log record failed");
  445. goto err;
  446. }
  447. /*
  448.  * We use the same txnlist infrastructure that recovery does;
  449.  * it demands an estimate of the high and low txnids for
  450.  * initialization.
  451.  *
  452.  * First, the low txnid.
  453.  */
  454. do {
  455. /* txnid is after rectype, which is a u_int32. */
  456. memcpy(&low_txn,
  457.     (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(low_txn));
  458. if (low_txn != 0)
  459. break;
  460. } while ((ret = logc->get(logc, &lsn, &rec, DB_NEXT)) == 0);
  461. /* If there are no txns, there are no PBNYC txns. */
  462. if (ret == DB_NOTFOUND) {
  463. ret = 0;
  464. goto done;
  465. } else if (ret != 0)
  466. goto err;
  467. /* Now, the high txnid. */
  468. if ((ret = logc->get(logc, &lsn, &rec, DB_LAST)) != 0) {
  469. /*
  470.  * Note that DB_NOTFOUND is unacceptable here because we
  471.  * had to have looked at some log record to get this far.
  472.  */
  473. __db_err(dbenv, "Final log record not found");
  474. goto err;
  475. }
  476. do {
  477. /* txnid is after rectype, which is a u_int32. */
  478. memcpy(&hi_txn,
  479.     (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(hi_txn));
  480. if (hi_txn != 0)
  481. break;
  482. } while ((ret = logc->get(logc, &lsn, &rec, DB_PREV)) == 0);
  483. if (ret == DB_NOTFOUND) {
  484. ret = 0;
  485. goto done;
  486. } else if (ret != 0)
  487. goto err;
  488. /* We have a high and low txnid.  Initialise the txn list. */
  489. if ((ret =
  490.     __db_txnlist_init(dbenv, low_txn, hi_txn, NULL, &txninfo)) != 0)
  491. goto err;
  492. /*
  493.  * Now, walk backward from the end of the log to ckp_lsn.  Any
  494.  * prepares that we hit without first hitting a commit or
  495.  * abort belong to PBNYC txns, and we need to apply them and
  496.  * restore them to a prepared state.
  497.  *
  498.  * Note that we wind up applying transactions out of order.
  499.  * Since all PBNYC txns still held locks on the old master and
  500.  * were isolated, this should be safe.
  501.  */
  502. for (ret = logc->get(logc, &lsn, &rec, DB_LAST);
  503.     ret == 0 && log_compare(&lsn, &ckp_lsn) > 0;
  504.     ret = logc->get(logc, &lsn, &rec, DB_PREV)) {
  505. memcpy(&rectype, rec.data, sizeof(rectype));
  506. switch (rectype) {
  507. case DB___txn_regop:
  508. /*
  509.  * It's a commit or abort--but we don't care
  510.  * which!  Just add it to the list of txns
  511.  * that are resolved.
  512.  */
  513. if ((ret = __txn_regop_read(dbenv, rec.data,
  514.     &regop_args)) != 0)
  515. goto err;
  516. ret = __db_txnlist_find(dbenv,
  517.     txninfo, regop_args->txnid->txnid);
  518. if (ret == DB_NOTFOUND)
  519. ret = __db_txnlist_add(dbenv, txninfo,
  520.     regop_args->txnid->txnid,
  521.     regop_args->opcode, &lsn);
  522. __os_free(dbenv, regop_args);
  523. break;
  524. case DB___txn_xa_regop:
  525. /*
  526.  * It's a prepare.  If we haven't put the
  527.  * txn on our list yet, it hasn't been
  528.  * resolved, so apply and restore it.
  529.  */
  530. if ((ret = __txn_xa_regop_read(dbenv, rec.data,
  531.     &prep_args)) != 0)
  532. goto err;
  533. ret = __db_txnlist_find(dbenv, txninfo,
  534.     prep_args->txnid->txnid);
  535. if (ret == DB_NOTFOUND)
  536. if ((ret = __rep_process_txn(dbenv, &rec)) == 0)
  537. ret = __txn_restore_txn(dbenv,
  538.     &lsn, prep_args);
  539. __os_free(dbenv, prep_args);
  540. break;
  541. default:
  542. continue;
  543. }
  544. }
  545. /* It's not an error to have hit the beginning of the log. */
  546. if (ret == DB_NOTFOUND)
  547. ret = 0;
  548. done:
  549. err: t_ret = logc->close(logc, 0);
  550. if (txninfo != NULL)
  551. __db_txnlist_end(dbenv, txninfo);
  552. return (ret == 0 ? t_ret : ret);
  553. }
  554. /*
  555.  * __rep_set_limit --
  556.  * Set a limit on the amount of data that will be sent during a single
  557.  * invocation of __rep_process_message.
  558.  */
  559. static int
  560. __rep_set_limit(dbenv, gbytes, bytes)
  561. DB_ENV *dbenv;
  562. u_int32_t gbytes;
  563. u_int32_t bytes;
  564. {
  565. DB_REP *db_rep;
  566. REP *rep;
  567. PANIC_CHECK(dbenv);
  568. if ((db_rep = dbenv->rep_handle) == NULL) {
  569. __db_err(dbenv,
  570.     "DB_ENV->set_rep_limit: database environment not properly initialized");
  571. return (__db_panic(dbenv, EINVAL));
  572. }
  573. rep = db_rep->region;
  574. MUTEX_LOCK(dbenv, db_rep->mutexp);
  575. if (bytes > GIGABYTE) {
  576. gbytes += bytes / GIGABYTE;
  577. bytes = bytes % GIGABYTE;
  578. }
  579. rep->gbytes = gbytes;
  580. rep->bytes = bytes;
  581. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  582. return (0);
  583. }
  584. /*
  585.  * __rep_set_request --
  586.  * Set the minimum and maximum number of log records that we wait
  587.  * before retransmitting.
  588.  * UNDOCUMENTED.
  589.  */
  590. static int
  591. __rep_set_request(dbenv, min, max)
  592. DB_ENV *dbenv;
  593. u_int32_t min;
  594. u_int32_t max;
  595. {
  596. LOG *lp;
  597. DB_LOG *dblp;
  598. DB_REP *db_rep;
  599. REP *rep;
  600. PANIC_CHECK(dbenv);
  601. if ((db_rep = dbenv->rep_handle) == NULL) {
  602. __db_err(dbenv,
  603.     "DB_ENV->set_rep_request: database environment not properly initialized");
  604. return (__db_panic(dbenv, EINVAL));
  605. }
  606. rep = db_rep->region;
  607. MUTEX_LOCK(dbenv, db_rep->mutexp);
  608. rep->request_gap = min;
  609. rep->max_gap = max;
  610. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  611. dblp = dbenv->lg_handle;
  612. if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
  613. R_LOCK(dbenv, &dblp->reginfo);
  614. lp->wait_recs = 0;
  615. lp->rcvd_recs = 0;
  616. R_UNLOCK(dbenv, &dblp->reginfo);
  617. }
  618. return (0);
  619. }
  620. /*
  621.  * __rep_set_transport --
  622.  * Set the transport function for replication.
  623.  */
  624. static int
  625. __rep_set_rep_transport(dbenv, eid, f_send)
  626. DB_ENV *dbenv;
  627. int eid;
  628. int (*f_send) __P((DB_ENV *, const DBT *, const DBT *, int, u_int32_t));
  629. {
  630. DB_REP *db_rep;
  631. PANIC_CHECK(dbenv);
  632. if ((db_rep = dbenv->rep_handle) == NULL) {
  633. __db_err(dbenv,
  634.     "DB_ENV->set_rep_transport: database environment not properly initialized");
  635. return (__db_panic(dbenv, EINVAL));
  636. }
  637. if (f_send == NULL) {
  638. __db_err(dbenv,
  639. "DB_ENV->set_rep_transport: no send function specified");
  640. return (EINVAL);
  641. }
  642. if (eid < 0) {
  643. __db_err(dbenv,
  644. "DB_ENV->set_rep_transport: eid must be greater than or equal to 0");
  645. return (EINVAL);
  646. }
  647. db_rep->rep_send = f_send;
  648. dbenv->rep_eid = eid;
  649. return (0);
  650. }
  651. /*
  652.  * __rep_elect --
  653.  * Called after master failure to hold/participate in an election for
  654.  * a new master.
  655.  */
  656. static int
  657. __rep_elect(dbenv, nsites, priority, timeout, eidp)
  658. DB_ENV *dbenv;
  659. int nsites, priority;
  660. u_int32_t timeout;
  661. int *eidp;
  662. {
  663. DB_LOG *dblp;
  664. DB_LSN lsn;
  665. DB_REP *db_rep;
  666. REP *rep;
  667. int in_progress, ret, send_vote, tiebreaker;
  668. u_int32_t pid, sec, usec;
  669. PANIC_CHECK(dbenv);
  670. ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_elect", DB_INIT_TXN);
  671. /* Error checking. */
  672. if (nsites <= 0) {
  673. __db_err(dbenv,
  674.     "DB_ENV->rep_elect: nsites must be greater than 0");
  675. return (EINVAL);
  676. }
  677. if (priority < 0) {
  678. __db_err(dbenv,
  679.     "DB_ENV->rep_elect: priority may not be negative");
  680. return (EINVAL);
  681. }
  682. db_rep = dbenv->rep_handle;
  683. rep = db_rep->region;
  684. dblp = dbenv->lg_handle;
  685. R_LOCK(dbenv, &dblp->reginfo);
  686. lsn = ((LOG *)dblp->reginfo.primary)->lsn;
  687. R_UNLOCK(dbenv, &dblp->reginfo);
  688. /* Generate a randomized tiebreaker value. */
  689. __os_id(&pid);
  690. if ((ret = __os_clock(dbenv, &sec, &usec)) != 0)
  691. return (ret);
  692. tiebreaker = pid ^ sec ^ usec ^ (u_int)rand() ^ P_TO_UINT32(&pid);
  693. if ((ret = __rep_elect_init(dbenv,
  694.     &lsn, nsites, priority, tiebreaker, &in_progress)) != 0) {
  695. if (ret == DB_REP_NEWMASTER) {
  696. ret = 0;
  697. *eidp = dbenv->rep_eid;
  698. }
  699. return (ret);
  700. }
  701. if (!in_progress) {
  702. #ifdef DIAGNOSTIC
  703. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  704. __db_err(dbenv, "Beginning an election");
  705. #endif
  706. if ((ret = __rep_send_message(dbenv,
  707.     DB_EID_BROADCAST, REP_ELECT, NULL, NULL, 0)) != 0)
  708. goto err;
  709. DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTSEND, ret, NULL);
  710. }
  711. /* Now send vote */
  712. if ((ret =
  713.     __rep_send_vote(dbenv, &lsn, nsites, priority, tiebreaker)) != 0)
  714. goto err;
  715. DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTVOTE1, ret, NULL);
  716. ret = __rep_wait(dbenv, timeout, eidp, REP_F_EPHASE1);
  717. DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTWAIT1, ret, NULL);
  718. switch (ret) {
  719. case 0:
  720. /* Check if election complete or phase complete. */
  721. if (*eidp != DB_EID_INVALID)
  722. return (0);
  723. goto phase2;
  724. case DB_TIMEOUT:
  725. break;
  726. default:
  727. goto err;
  728. }
  729. /*
  730.  * If we got here, we haven't heard from everyone, but we've
  731.  * run out of time, so it's time to decide if we have enough
  732.  * votes to pick a winner and if so, to send out a vote to
  733.  * the winner.
  734.  */
  735. MUTEX_LOCK(dbenv, db_rep->mutexp);
  736. send_vote = DB_EID_INVALID;
  737. if (rep->sites > rep->nsites / 2) {
  738. /* We think we've seen enough to cast a vote. */
  739. send_vote = rep->winner;
  740. if (rep->winner == rep->eid)
  741. rep->votes++;
  742. F_CLR(rep, REP_F_EPHASE1);
  743. F_SET(rep, REP_F_EPHASE2);
  744. }
  745. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  746. if (send_vote == DB_EID_INVALID) {
  747. /* We do not have enough votes to elect. */
  748. #ifdef DIAGNOSTIC
  749. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  750. __db_err(dbenv,
  751.     "Not enough votes to elect: received %d of %d",
  752.     rep->sites, rep->nsites);
  753. #endif
  754. ret = DB_REP_UNAVAIL;
  755. goto err;
  756. }
  757. #ifdef DIAGNOSTIC
  758. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION) &&
  759.     send_vote != rep->eid)
  760. __db_err(dbenv, "Sending vote");
  761. #endif
  762. if (send_vote != rep->eid && (ret = __rep_send_message(dbenv,
  763.     send_vote, REP_VOTE2, NULL, NULL, 0)) != 0)
  764. goto err;
  765. DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTVOTE2, ret, NULL);
  766. phase2: ret = __rep_wait(dbenv, timeout, eidp, REP_F_EPHASE2);
  767. DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTWAIT2, ret, NULL);
  768. switch (ret) {
  769. case 0:
  770. return (0);
  771. case DB_TIMEOUT:
  772. ret = DB_REP_UNAVAIL;
  773. break;
  774. default:
  775. goto err;
  776. }
  777. DB_TEST_RECOVERY_LABEL
  778. err: MUTEX_LOCK(dbenv, db_rep->mutexp);
  779. ELECTION_DONE(rep);
  780. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  781. #ifdef DIAGNOSTIC
  782. if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
  783. __db_err(dbenv, "Ended election with %d", ret);
  784. #endif
  785. return (ret);
  786. }
  787. /*
  788.  * __rep_elect_init
  789.  * Initialize an election.  Sets beginp non-zero if the election is
  790.  * already in progress; makes it 0 otherwise.
  791.  */
  792. static int
  793. __rep_elect_init(dbenv, lsnp, nsites, priority, tiebreaker, beginp)
  794. DB_ENV *dbenv;
  795. DB_LSN *lsnp;
  796. int nsites, priority, tiebreaker, *beginp;
  797. {
  798. DB_REP *db_rep;
  799. REP *rep;
  800. int ret, *tally;
  801. db_rep = dbenv->rep_handle;
  802. rep = db_rep->region;
  803. ret = 0;
  804. /* We may miscount, as we don't hold the replication mutex here. */
  805. rep->stat.st_elections++;
  806. /* If we are already a master; simply broadcast that fact and return. */
  807. if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
  808. (void)__rep_send_message(dbenv,
  809.     DB_EID_BROADCAST, REP_NEWMASTER, lsnp, NULL, 0);
  810. rep->stat.st_elections_won++;
  811. return (DB_REP_NEWMASTER);
  812. }
  813. MUTEX_LOCK(dbenv, db_rep->mutexp);
  814. *beginp = IN_ELECTION(rep);
  815. if (!*beginp) {
  816. /*
  817.  * Make sure that we always initialize all the election fields
  818.  * before putting ourselves in an election state.  That means
  819.  * issuing calls that can fail (allocation) before setting all
  820.  * the variables.
  821.  */
  822. if (nsites > rep->asites &&
  823.     (ret = __rep_grow_sites(dbenv, nsites)) != 0)
  824. goto err;
  825. DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTINIT, ret, NULL);
  826. rep->nsites = nsites;
  827. rep->priority = priority;
  828. rep->votes = 0;
  829. rep->master_id = DB_EID_INVALID;
  830. F_SET(rep, REP_F_EPHASE1);
  831. /* We have always heard from ourselves. */
  832. rep->sites = 1;
  833. tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
  834. tally[0] = rep->eid;
  835. if (priority != 0) {
  836. /* Make ourselves the winner to start. */
  837. rep->winner = rep->eid;
  838. rep->w_priority = priority;
  839. rep->w_gen = rep->gen;
  840. rep->w_lsn = *lsnp;
  841. rep->w_tiebreaker = tiebreaker;
  842. } else {
  843. rep->winner = DB_EID_INVALID;
  844. rep->w_priority = 0;
  845. rep->w_gen = 0;
  846. ZERO_LSN(rep->w_lsn);
  847. rep->w_tiebreaker = 0;
  848. }
  849. }
  850. DB_TEST_RECOVERY_LABEL
  851. err: MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  852. return (ret);
  853. }
  854. static int
  855. __rep_wait(dbenv, timeout, eidp, flags)
  856. DB_ENV *dbenv;
  857. u_int32_t timeout;
  858. int *eidp;
  859. u_int32_t flags;
  860. {
  861. DB_REP *db_rep;
  862. REP *rep;
  863. int done, ret;
  864. u_int32_t sleeptime;
  865. done = 0;
  866. db_rep = dbenv->rep_handle;
  867. rep = db_rep->region;
  868. /*
  869.  * The user specifies an overall timeout function, but checking
  870.  * is cheap and the timeout may be a generous upper bound.
  871.  * Sleep repeatedly for the smaller of .5s and timeout/10.
  872.  */
  873. sleeptime = (timeout > 5000000) ? 500000 : timeout / 10;
  874. if (sleeptime == 0)
  875. sleeptime++;
  876. while (timeout > 0) {
  877. if ((ret = __os_sleep(dbenv, 0, sleeptime)) != 0)
  878. return (ret);
  879. MUTEX_LOCK(dbenv, db_rep->mutexp);
  880. done = !F_ISSET(rep, flags) && rep->master_id != DB_EID_INVALID;
  881. *eidp = rep->master_id;
  882. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  883. if (done)
  884. return (0);
  885. if (timeout > sleeptime)
  886. timeout -= sleeptime;
  887. else
  888. timeout = 0;
  889. }
  890. return (DB_TIMEOUT);
  891. }
  892. /*
  893.  * __rep_flush --
  894.  * Re-push the last log record to all clients, in case they've lost
  895.  * messages and don't know it.
  896.  */
  897. static int
  898. __rep_flush(dbenv)
  899. DB_ENV *dbenv;
  900. {
  901. DBT rec;
  902. DB_LOGC *logc;
  903. DB_LSN lsn;
  904. int ret, t_ret;
  905. PANIC_CHECK(dbenv);
  906. ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
  907. if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
  908. return (ret);
  909. memset(&rec, 0, sizeof(rec));
  910. memset(&lsn, 0, sizeof(lsn));
  911. if ((ret = logc->get(logc, &lsn, &rec, DB_LAST)) != 0)
  912. goto err;
  913. ret = __rep_send_message(dbenv,
  914.     DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0);
  915. err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
  916. ret = t_ret;
  917. return (ret);
  918. }
  919. /*
  920.  * __rep_stat --
  921.  * Fetch replication statistics.
  922.  */
  923. static int
  924. __rep_stat(dbenv, statp, flags)
  925. DB_ENV *dbenv;
  926. DB_REP_STAT **statp;
  927. u_int32_t flags;
  928. {
  929. DB_LOG *dblp;
  930. DB_REP *db_rep;
  931. DB_REP_STAT *stats;
  932. LOG *lp;
  933. REP *rep;
  934. u_int32_t queued;
  935. int ret;
  936. PANIC_CHECK(dbenv);
  937. ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
  938. db_rep = dbenv->rep_handle;
  939. rep = db_rep->region;
  940. dblp = dbenv->lg_handle;
  941. lp = dblp->reginfo.primary;
  942. *statp = NULL;
  943. if ((ret = __db_fchk(dbenv,
  944.     "DB_ENV->rep_stat", flags, DB_STAT_CLEAR)) != 0)
  945. return (ret);
  946. /* Allocate a stat struct to return to the user. */
  947. if ((ret = __os_umalloc(dbenv, sizeof(DB_REP_STAT), &stats)) != 0)
  948. return (ret);
  949. MUTEX_LOCK(dbenv, db_rep->mutexp);
  950. memcpy(stats, &rep->stat, sizeof(*stats));
  951. /* Copy out election stats. */
  952. if (IN_ELECTION(rep)) {
  953. if (F_ISSET(rep, REP_F_EPHASE1))
  954. stats->st_election_status = 1;
  955. else if (F_ISSET(rep, REP_F_EPHASE2))
  956. stats->st_election_status = 2;
  957. stats->st_election_nsites = rep->sites;
  958. stats->st_election_cur_winner = rep->winner;
  959. stats->st_election_priority = rep->w_priority;
  960. stats->st_election_gen = rep->w_gen;
  961. stats->st_election_lsn = rep->w_lsn;
  962. stats->st_election_votes = rep->votes;
  963. stats->st_election_tiebreaker = rep->w_tiebreaker;
  964. }
  965. /* Copy out other info that's protected by the rep mutex. */
  966. stats->st_env_id = rep->eid;
  967. stats->st_env_priority = rep->priority;
  968. stats->st_nsites = rep->nsites;
  969. stats->st_master = rep->master_id;
  970. stats->st_gen = rep->gen;
  971. if (F_ISSET(rep, REP_F_MASTER))
  972. stats->st_status = DB_REP_MASTER;
  973. else if (F_ISSET(rep, REP_F_LOGSONLY))
  974. stats->st_status = DB_REP_LOGSONLY;
  975. else if (F_ISSET(rep, REP_F_UPGRADE))
  976. stats->st_status = DB_REP_CLIENT;
  977. else
  978. stats->st_status = 0;
  979. if (LF_ISSET(DB_STAT_CLEAR)) {
  980. queued = rep->stat.st_log_queued;
  981. memset(&rep->stat, 0, sizeof(rep->stat));
  982. rep->stat.st_log_queued = rep->stat.st_log_queued_total =
  983.     rep->stat.st_log_queued_max = queued;
  984. }
  985. MUTEX_UNLOCK(dbenv, db_rep->mutexp);
  986. /*
  987.  * Log-related replication info is stored in the log system and
  988.  * protected by the log region lock.
  989.  */
  990. R_LOCK(dbenv, &dblp->reginfo);
  991. if (F_ISSET(rep, REP_ISCLIENT)) {
  992. stats->st_next_lsn = lp->ready_lsn;
  993. stats->st_waiting_lsn = lp->waiting_lsn;
  994. } else {
  995. if (F_ISSET(rep, REP_F_MASTER))
  996. stats->st_next_lsn = lp->lsn;
  997. else
  998. ZERO_LSN(stats->st_next_lsn);
  999. ZERO_LSN(stats->st_waiting_lsn);
  1000. }
  1001. R_UNLOCK(dbenv, &dblp->reginfo);
  1002. *statp = stats;
  1003. return (0);
  1004. }