Skip to content
Snippets Groups Projects
Commit ea69a0cd authored by Vladimir Davydov's avatar Vladimir Davydov Committed by Konstantin Osipov
Browse files

replication: rebootstrap instance on startup if it fell behind

If a replica fell too much behind its peers in the cluster and xlog
files needed for it to get up to speed have been removed, it won't be
able to proceed without rebootstrap. This patch makes the recovery
procedure detect such cases and initiate rebootstrap procedure if
necessary.

Note, rebootstrap is currently only supported by memtx engine. If there
are vinyl spaces on the replica, rebootstrap will fail. This is fixed by
the following patches.

Part of #461
parent 0ecabde8
No related branches found
No related tags found
No related merge requests found
......@@ -1801,6 +1801,9 @@ bootstrap(const struct tt_uuid *instance_uuid,
/**
* Recover the instance from the local directory.
* Enter hot standby if the directory is locked.
* Invoke rebootstrap if the instance fell too much
* behind its peers in the replica set and needs
* to be rebootstrapped.
*/
static void
local_recovery(const struct tt_uuid *instance_uuid,
......@@ -1836,6 +1839,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
if (wal_dir_lock >= 0) {
box_listen();
box_sync_replication(replication_connect_timeout, false);
struct replica *master;
if (replicaset_needs_rejoin(&master)) {
say_crit("replica is too old, initiating rebootstrap");
return bootstrap_from_master(master);
}
}
/*
......
......@@ -41,6 +41,7 @@
#include "error.h"
#include "relay.h"
#include "vclock.h" /* VCLOCK_MAX */
#include "sio.h"
uint32_t instance_id = REPLICA_ID_NIL;
struct tt_uuid INSTANCE_UUID;
......@@ -625,6 +626,64 @@ replicaset_connect(struct applier **appliers, int count,
"failed to connect to one or more replicas");
}
bool
replicaset_needs_rejoin(struct replica **master)
{
struct replica *leader = NULL;
replicaset_foreach(replica) {
struct applier *applier = replica->applier;
if (applier == NULL)
continue;
const struct ballot *ballot = &applier->ballot;
if (vclock_compare(&ballot->gc_vclock,
&replicaset.vclock) <= 0) {
/*
* There's at least one master that still stores
* WALs needed by this instance. Proceed to local
* recovery.
*/
return false;
}
const char *addr_str = sio_strfaddr(&applier->addr,
applier->addr_len);
char *local_vclock_str = vclock_to_string(&replicaset.vclock);
char *remote_vclock_str = vclock_to_string(&ballot->vclock);
char *gc_vclock_str = vclock_to_string(&ballot->gc_vclock);
say_info("can't follow %s: required %s available %s",
addr_str, local_vclock_str, gc_vclock_str);
if (vclock_compare(&replicaset.vclock, &ballot->vclock) > 0) {
/*
* Replica has some rows that are not present on
* the master. Don't rebootstrap as we don't want
* to lose any data.
*/
say_info("can't rebootstrap from %s: "
"replica has local rows: local %s remote %s",
addr_str, local_vclock_str, remote_vclock_str);
goto next;
}
/* Prefer a master with the max vclock. */
if (leader == NULL ||
vclock_sum(&ballot->vclock) >
vclock_sum(&leader->applier->ballot.vclock))
leader = replica;
next:
free(local_vclock_str);
free(remote_vclock_str);
free(gc_vclock_str);
}
if (leader == NULL)
return false;
*master = leader;
return true;
}
void
replicaset_follow(void)
{
......
......@@ -359,6 +359,15 @@ void
replicaset_connect(struct applier **appliers, int count,
double timeout, bool connect_all);
/**
* Check if the current instance fell too much behind its
* peers in the replica set and needs to be rebootstrapped.
* If it does, return true and set @master to the instance
* to use for rebootstrap, otherwise return false.
*/
bool
replicaset_needs_rejoin(struct replica **master);
/**
* Resume all appliers registered with the replica set.
*/
......
env = require('test_run')
---
...
test_run = env.new()
---
...
-- Cleanup the instance to remove vylog files left from previous
-- tests, since vinyl doesn't support rebootstrap yet.
test_run:cmd('restart server default with cleanup=1')
--
-- gh-461: check that a replica refetches the last checkpoint
-- in case it fell behind the master.
--
box.schema.user.grant('guest', 'replication')
---
...
_ = box.schema.space.create('test')
---
...
_ = box.space.test:create_index('pk')
---
...
_ = box.space.test:insert{1}
---
...
_ = box.space.test:insert{2}
---
...
_ = box.space.test:insert{3}
---
...
-- Join a replica, then stop it.
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
---
- true
...
test_run:cmd("start server replica")
---
- true
...
test_run:cmd("switch replica")
---
- true
...
box.info.replication[1].upstream.status == 'follow' or box.info
---
- true
...
box.space.test:select()
---
- - [1]
- [2]
- [3]
...
test_run:cmd("switch default")
---
- true
...
test_run:cmd("stop server replica")
---
- true
...
-- Restart the server to purge the replica from
-- the garbage collection state.
test_run:cmd("restart server default")
-- Make some checkpoints to remove old xlogs.
checkpoint_count = box.cfg.checkpoint_count
---
...
box.cfg{checkpoint_count = 1}
---
...
_ = box.space.test:delete{1}
---
...
_ = box.space.test:insert{10}
---
...
box.snapshot()
---
- ok
...
_ = box.space.test:delete{2}
---
...
_ = box.space.test:insert{20}
---
...
box.snapshot()
---
- ok
...
_ = box.space.test:delete{3}
---
...
_ = box.space.test:insert{30}
---
...
fio = require('fio')
---
...
#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
---
- 1
...
box.cfg{checkpoint_count = checkpoint_count}
---
...
-- Restart the replica. Since xlogs have been removed,
-- it is supposed to rejoin without changing id.
test_run:cmd("start server replica")
---
- true
...
box.info.replication[2].downstream.vclock ~= nil or box.info
---
- true
...
test_run:cmd("switch replica")
---
- true
...
box.info.replication[1].upstream.status == 'follow' or box.info
---
- true
...
box.space.test:select()
---
- - [10]
- [20]
- [30]
...
test_run:cmd("switch default")
---
- true
...
-- Make sure the replica follows new changes.
for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
---
...
vclock = test_run:get_vclock('default')
---
...
_ = test_run:wait_vclock('replica', vclock)
---
...
test_run:cmd("switch replica")
---
- true
...
box.space.test:select()
---
- - [10, 10]
- [20, 20]
- [30, 30]
...
-- Check that restart works as usual.
test_run:cmd("restart server replica")
box.info.replication[1].upstream.status == 'follow' or box.info
---
- true
...
box.space.test:select()
---
- - [10, 10]
- [20, 20]
- [30, 30]
...
-- Check that rebootstrap is NOT initiated unless the replica
-- is strictly behind the master.
box.space.test:replace{1, 2, 3} -- bumps LSN on the replica
---
- [1, 2, 3]
...
test_run:cmd("switch default")
---
- true
...
test_run:cmd("stop server replica")
---
- true
...
test_run:cmd("restart server default")
checkpoint_count = box.cfg.checkpoint_count
---
...
box.cfg{checkpoint_count = 1}
---
...
for i = 1, 3 do box.space.test:delete{i * 10} end
---
...
box.snapshot()
---
- ok
...
for i = 1, 3 do box.space.test:insert{i * 100} end
---
...
fio = require('fio')
---
...
#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
---
- 1
...
box.cfg{checkpoint_count = checkpoint_count}
---
...
test_run:cmd("start server replica")
---
- true
...
test_run:cmd("switch replica")
---
- true
...
box.info.status -- orphan
---
- orphan
...
box.space.test:select()
---
- - [1, 2, 3]
- [10, 10]
- [20, 20]
- [30, 30]
...
-- Cleanup.
test_run:cmd("switch default")
---
- true
...
test_run:cmd("stop server replica")
---
- true
...
test_run:cmd("cleanup server replica")
---
- true
...
box.space.test:drop()
---
...
box.schema.user.revoke('guest', 'replication')
---
...
env = require('test_run')
test_run = env.new()
-- Cleanup the instance to remove vylog files left from previous
-- tests, since vinyl doesn't support rebootstrap yet.
test_run:cmd('restart server default with cleanup=1')
--
-- gh-461: check that a replica refetches the last checkpoint
-- in case it fell behind the master.
--
box.schema.user.grant('guest', 'replication')
_ = box.schema.space.create('test')
_ = box.space.test:create_index('pk')
_ = box.space.test:insert{1}
_ = box.space.test:insert{2}
_ = box.space.test:insert{3}
-- Join a replica, then stop it.
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
test_run:cmd("start server replica")
test_run:cmd("switch replica")
box.info.replication[1].upstream.status == 'follow' or box.info
box.space.test:select()
test_run:cmd("switch default")
test_run:cmd("stop server replica")
-- Restart the server to purge the replica from
-- the garbage collection state.
test_run:cmd("restart server default")
-- Make some checkpoints to remove old xlogs.
checkpoint_count = box.cfg.checkpoint_count
box.cfg{checkpoint_count = 1}
_ = box.space.test:delete{1}
_ = box.space.test:insert{10}
box.snapshot()
_ = box.space.test:delete{2}
_ = box.space.test:insert{20}
box.snapshot()
_ = box.space.test:delete{3}
_ = box.space.test:insert{30}
fio = require('fio')
#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
box.cfg{checkpoint_count = checkpoint_count}
-- Restart the replica. Since xlogs have been removed,
-- it is supposed to rejoin without changing id.
test_run:cmd("start server replica")
box.info.replication[2].downstream.vclock ~= nil or box.info
test_run:cmd("switch replica")
box.info.replication[1].upstream.status == 'follow' or box.info
box.space.test:select()
test_run:cmd("switch default")
-- Make sure the replica follows new changes.
for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
vclock = test_run:get_vclock('default')
_ = test_run:wait_vclock('replica', vclock)
test_run:cmd("switch replica")
box.space.test:select()
-- Check that restart works as usual.
test_run:cmd("restart server replica")
box.info.replication[1].upstream.status == 'follow' or box.info
box.space.test:select()
-- Check that rebootstrap is NOT initiated unless the replica
-- is strictly behind the master.
box.space.test:replace{1, 2, 3} -- bumps LSN on the replica
test_run:cmd("switch default")
test_run:cmd("stop server replica")
test_run:cmd("restart server default")
checkpoint_count = box.cfg.checkpoint_count
box.cfg{checkpoint_count = 1}
for i = 1, 3 do box.space.test:delete{i * 10} end
box.snapshot()
for i = 1, 3 do box.space.test:insert{i * 100} end
fio = require('fio')
#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
box.cfg{checkpoint_count = checkpoint_count}
test_run:cmd("start server replica")
test_run:cmd("switch replica")
box.info.status -- orphan
box.space.test:select()
-- Cleanup.
test_run:cmd("switch default")
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
box.space.test:drop()
box.schema.user.revoke('guest', 'replication')
......@@ -6,6 +6,7 @@
"wal_off.test.lua": {},
"hot_standby.test.lua": {},
"rebootstrap.test.lua": {},
"replica_rejoin.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment