diff --git a/src/box/box.cc b/src/box/box.cc index ecc448919517c82faff2b5167728e19b7412cd94..ae4959d6f4ba13a11d596cc0abe93bf0cd58af53 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1801,6 +1801,9 @@ bootstrap(const struct tt_uuid *instance_uuid, /** * Recover the instance from the local directory. * Enter hot standby if the directory is locked. + * Invoke rebootstrap if the instance fell too much + * behind its peers in the replica set and needs + * to be rebootstrapped. */ static void local_recovery(const struct tt_uuid *instance_uuid, @@ -1836,6 +1839,12 @@ local_recovery(const struct tt_uuid *instance_uuid, if (wal_dir_lock >= 0) { box_listen(); box_sync_replication(replication_connect_timeout, false); + + struct replica *master; + if (replicaset_needs_rejoin(&master)) { + say_crit("replica is too old, initiating rebootstrap"); + return bootstrap_from_master(master); + } } /* diff --git a/src/box/replication.cc b/src/box/replication.cc index c4d6e6f2690072fc725489731fa9441237c32161..bf7b8c225cdea1921050c798c1f1252734ad91f3 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -41,6 +41,7 @@ #include "error.h" #include "relay.h" #include "vclock.h" /* VCLOCK_MAX */ +#include "sio.h" uint32_t instance_id = REPLICA_ID_NIL; struct tt_uuid INSTANCE_UUID; @@ -625,6 +626,64 @@ replicaset_connect(struct applier **appliers, int count, "failed to connect to one or more replicas"); } +bool +replicaset_needs_rejoin(struct replica **master) +{ + struct replica *leader = NULL; + replicaset_foreach(replica) { + struct applier *applier = replica->applier; + if (applier == NULL) + continue; + + const struct ballot *ballot = &applier->ballot; + if (vclock_compare(&ballot->gc_vclock, + &replicaset.vclock) <= 0) { + /* + * There's at least one master that still stores + * WALs needed by this instance. Proceed to local + * recovery. + */ + return false; + } + + const char *addr_str = sio_strfaddr(&applier->addr, + applier->addr_len); + char *local_vclock_str = vclock_to_string(&replicaset.vclock); + char *remote_vclock_str = vclock_to_string(&ballot->vclock); + char *gc_vclock_str = vclock_to_string(&ballot->gc_vclock); + + say_info("can't follow %s: required %s available %s", + addr_str, local_vclock_str, gc_vclock_str); + + if (vclock_compare(&replicaset.vclock, &ballot->vclock) > 0) { + /* + * Replica has some rows that are not present on + * the master. Don't rebootstrap as we don't want + * to lose any data. + */ + say_info("can't rebootstrap from %s: " + "replica has local rows: local %s remote %s", + addr_str, local_vclock_str, remote_vclock_str); + goto next; + } + + /* Prefer a master with the max vclock. */ + if (leader == NULL || + vclock_sum(&ballot->vclock) > + vclock_sum(&leader->applier->ballot.vclock)) + leader = replica; +next: + free(local_vclock_str); + free(remote_vclock_str); + free(gc_vclock_str); + } + if (leader == NULL) + return false; + + *master = leader; + return true; +} + void replicaset_follow(void) { diff --git a/src/box/replication.h b/src/box/replication.h index fdf995c310ed250cfe8ca8fae5ffd944342d1dc8..e8b391af27c8298ae2f6e091cd35625f916448ba 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -359,6 +359,15 @@ void replicaset_connect(struct applier **appliers, int count, double timeout, bool connect_all); +/** + * Check if the current instance fell too much behind its + * peers in the replica set and needs to be rebootstrapped. + * If it does, return true and set @master to the instance + * to use for rebootstrap, otherwise return false. + */ +bool +replicaset_needs_rejoin(struct replica **master); + /** * Resume all appliers registered with the replica set. */ diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result new file mode 100644 index 0000000000000000000000000000000000000000..b7563ed94990873ee927e3c90ab751015182bd80 --- /dev/null +++ b/test/replication/replica_rejoin.result @@ -0,0 +1,247 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +-- Cleanup the instance to remove vylog files left from previous +-- tests, since vinyl doesn't support rebootstrap yet. +test_run:cmd('restart server default with cleanup=1') +-- +-- gh-461: check that a replica refetches the last checkpoint +-- in case it fell behind the master. +-- +box.schema.user.grant('guest', 'replication') +--- +... +_ = box.schema.space.create('test') +--- +... +_ = box.space.test:create_index('pk') +--- +... +_ = box.space.test:insert{1} +--- +... +_ = box.space.test:insert{2} +--- +... +_ = box.space.test:insert{3} +--- +... +-- Join a replica, then stop it. +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.info.replication[1].upstream.status == 'follow' or box.info +--- +- true +... +box.space.test:select() +--- +- - [1] + - [2] + - [3] +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +-- Restart the server to purge the replica from +-- the garbage collection state. +test_run:cmd("restart server default") +-- Make some checkpoints to remove old xlogs. +checkpoint_count = box.cfg.checkpoint_count +--- +... +box.cfg{checkpoint_count = 1} +--- +... +_ = box.space.test:delete{1} +--- +... +_ = box.space.test:insert{10} +--- +... +box.snapshot() +--- +- ok +... +_ = box.space.test:delete{2} +--- +... +_ = box.space.test:insert{20} +--- +... +box.snapshot() +--- +- ok +... +_ = box.space.test:delete{3} +--- +... +_ = box.space.test:insert{30} +--- +... +fio = require('fio') +--- +... +#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1 +--- +- 1 +... +box.cfg{checkpoint_count = checkpoint_count} +--- +... +-- Restart the replica. Since xlogs have been removed, +-- it is supposed to rejoin without changing id. +test_run:cmd("start server replica") +--- +- true +... +box.info.replication[2].downstream.vclock ~= nil or box.info +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.info.replication[1].upstream.status == 'follow' or box.info +--- +- true +... +box.space.test:select() +--- +- - [10] + - [20] + - [30] +... +test_run:cmd("switch default") +--- +- true +... +-- Make sure the replica follows new changes. +for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end +--- +... +vclock = test_run:get_vclock('default') +--- +... +_ = test_run:wait_vclock('replica', vclock) +--- +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [10, 10] + - [20, 20] + - [30, 30] +... +-- Check that restart works as usual. +test_run:cmd("restart server replica") +box.info.replication[1].upstream.status == 'follow' or box.info +--- +- true +... +box.space.test:select() +--- +- - [10, 10] + - [20, 20] + - [30, 30] +... +-- Check that rebootstrap is NOT initiated unless the replica +-- is strictly behind the master. +box.space.test:replace{1, 2, 3} -- bumps LSN on the replica +--- +- [1, 2, 3] +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("restart server default") +checkpoint_count = box.cfg.checkpoint_count +--- +... +box.cfg{checkpoint_count = 1} +--- +... +for i = 1, 3 do box.space.test:delete{i * 10} end +--- +... +box.snapshot() +--- +- ok +... +for i = 1, 3 do box.space.test:insert{i * 100} end +--- +... +fio = require('fio') +--- +... +#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1 +--- +- 1 +... +box.cfg{checkpoint_count = checkpoint_count} +--- +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.info.status -- orphan +--- +- orphan +... +box.space.test:select() +--- +- - [1, 2, 3] + - [10, 10] + - [20, 20] + - [30, 30] +... +-- Cleanup. +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.space.test:drop() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..dfcb79cf8e83353fa767c13f073e44f4adfec322 --- /dev/null +++ b/test/replication/replica_rejoin.test.lua @@ -0,0 +1,92 @@ +env = require('test_run') +test_run = env.new() + +-- Cleanup the instance to remove vylog files left from previous +-- tests, since vinyl doesn't support rebootstrap yet. +test_run:cmd('restart server default with cleanup=1') + +-- +-- gh-461: check that a replica refetches the last checkpoint +-- in case it fell behind the master. +-- +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('test') +_ = box.space.test:create_index('pk') +_ = box.space.test:insert{1} +_ = box.space.test:insert{2} +_ = box.space.test:insert{3} + +-- Join a replica, then stop it. +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") +box.info.replication[1].upstream.status == 'follow' or box.info +box.space.test:select() +test_run:cmd("switch default") +test_run:cmd("stop server replica") + +-- Restart the server to purge the replica from +-- the garbage collection state. +test_run:cmd("restart server default") + +-- Make some checkpoints to remove old xlogs. +checkpoint_count = box.cfg.checkpoint_count +box.cfg{checkpoint_count = 1} +_ = box.space.test:delete{1} +_ = box.space.test:insert{10} +box.snapshot() +_ = box.space.test:delete{2} +_ = box.space.test:insert{20} +box.snapshot() +_ = box.space.test:delete{3} +_ = box.space.test:insert{30} +fio = require('fio') +#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1 +box.cfg{checkpoint_count = checkpoint_count} + +-- Restart the replica. Since xlogs have been removed, +-- it is supposed to rejoin without changing id. +test_run:cmd("start server replica") +box.info.replication[2].downstream.vclock ~= nil or box.info +test_run:cmd("switch replica") +box.info.replication[1].upstream.status == 'follow' or box.info +box.space.test:select() +test_run:cmd("switch default") + +-- Make sure the replica follows new changes. +for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end +vclock = test_run:get_vclock('default') +_ = test_run:wait_vclock('replica', vclock) +test_run:cmd("switch replica") +box.space.test:select() + +-- Check that restart works as usual. +test_run:cmd("restart server replica") +box.info.replication[1].upstream.status == 'follow' or box.info +box.space.test:select() + +-- Check that rebootstrap is NOT initiated unless the replica +-- is strictly behind the master. +box.space.test:replace{1, 2, 3} -- bumps LSN on the replica +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("restart server default") +checkpoint_count = box.cfg.checkpoint_count +box.cfg{checkpoint_count = 1} +for i = 1, 3 do box.space.test:delete{i * 10} end +box.snapshot() +for i = 1, 3 do box.space.test:insert{i * 100} end +fio = require('fio') +#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1 +box.cfg{checkpoint_count = checkpoint_count} +test_run:cmd("start server replica") +test_run:cmd("switch replica") +box.info.status -- orphan +box.space.test:select() + +-- Cleanup. +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +box.space.test:drop() +box.schema.user.revoke('guest', 'replication') diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 95e94e5a2a0a239acf5c8e96d3e5e0e8dccb64cb..2b609f16243344c6d5263bbb21eeef167671921f 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -6,6 +6,7 @@ "wal_off.test.lua": {}, "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, + "replica_rejoin.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"}