Skip to content
Snippets Groups Projects
Commit 113ade24 authored by Vladimir Davydov's avatar Vladimir Davydov Committed by Kirill Yukhin
Browse files

box: sync on replication configuration update

Now box.cfg() doesn't return until 'quorum' appliers are in sync not
only on initial configuration, but also on replication configuration
update. If it fails to synchronize within replication_sync_timeout,
box.cfg() returns without an error, but the instance enters 'orphan'
state, which is basically read-only mode. In the meantime, appliers
will keep trying to synchronize in the background, and the instance
will leave 'orphan' state as soon as enough appliers are in sync.

Note, this patch also changes logging a bit:
 - 'ready to accept request' is printed on startup before syncing
   with the replica set, because although the instance is read-only
   at that time, it can indeed accept all sorts of ro requests.
 - For 'connecting', 'connected', 'synchronizing' messages, we now
   use 'info' logging level, not 'verbose' as they used to be, because
   those messages are important as they give the admin idea what's
   going on with the instance, and they can't flood logs.
 - 'sync complete' message is also printed as 'info', not 'crit',
   because there's nothing critical about it (it's not an error).

Also note that we only enter 'orphan' state if failed to synchronize.
In particular, if the instnace manages to synchronize with all replicas
within a timeout, it will jump from 'loading' straight into 'running'
bypassing 'orphan' state. This is done for the sake of consistency
between initial configuration and reconfiguration.

Closes #3427

@TarantoolBot document
Title: Sync on replication configuration update
The behavior of box.cfg() on replication configuration update is
now consistent with initial configuration, that is box.cfg() will
not return until it synchronizes with as many masters as specified
by replication_connect_quorum configuration option or the timeout
specified by replication_connect_sync occurs. On timeout, it will
return without an error, but the instance will enter 'orphan' state.
It will leave 'orphan' state as soon as enough appliers have synced.
parent ca9fc33a
No related branches found
No related tags found
No related merge requests found
......@@ -110,7 +110,7 @@ static fiber_cond ro_cond;
* synchronize to a sufficient number of replicas to form
* a quorum and so was forced to switch to read-only mode.
*/
static bool is_orphan = true;
static bool is_orphan;
/* Use the shared instance of xstream for all appliers */
static struct xstream join_stream;
......@@ -219,16 +219,22 @@ box_wait_ro(bool ro, double timeout)
}
void
box_clear_orphan(void)
box_set_orphan(bool orphan)
{
if (!is_orphan)
if (is_orphan == orphan)
return; /* nothing to do */
is_orphan = false;
is_orphan = orphan;
fiber_cond_broadcast(&ro_cond);
/* Update the title to reflect the new status. */
title("running");
if (is_orphan) {
say_crit("entering orphan mode");
title("orphan");
} else {
say_crit("leaving orphan mode");
title("running");
}
}
struct wal_stream {
......@@ -646,6 +652,8 @@ box_set_replication(void)
box_sync_replication(true);
/* Follow replica */
replicaset_follow();
/* Wait until appliers are in sync */
replicaset_sync();
}
void
......@@ -1893,8 +1901,6 @@ box_cfg_xc(void)
/** Begin listening only when the local recovery is complete. */
box_listen();
title("orphan");
/*
* In case of recovering from a checkpoint we
* don't need to wait for 'quorum' masters, since
......@@ -1913,8 +1919,6 @@ box_cfg_xc(void)
*/
box_listen();
title("orphan");
/*
* Wait for the cluster to start up.
*
......@@ -1951,25 +1955,17 @@ box_cfg_xc(void)
rmean_cleanup(rmean_box);
/*
* If this instance is a leader of a newly bootstrapped
* cluster, it is uptodate by definition so leave the
* 'orphan' mode right away to let it initialize cluster
* schema.
*/
if (is_bootstrap_leader)
box_clear_orphan();
/* Follow replica */
replicaset_follow();
fiber_gc();
is_box_configured = true;
title("running");
say_info("ready to accept requests");
if (!is_bootstrap_leader)
replicaset_sync();
say_info("ready to accept requests");
}
void
......
......@@ -100,12 +100,16 @@ int
box_wait_ro(bool ro, double timeout);
/**
* Switch this instance from 'orphan' to 'running' state.
* Called on initial configuration as soon as this instance
* synchronizes with enough replicas to form a quorum.
* Switch this instance from 'orphan' to 'running' state or
* vice versa depending on the value of the function argument.
*
* An instance enters 'orphan' state on returning from box.cfg()
* if it failed to synchornize with 'quorum' replicas within a
* specified timeout. It will keep trying to synchronize in the
* background and leave 'orphan' state once it's done.
*/
void
box_clear_orphan(void);
box_set_orphan(bool orphan);
/** True if snapshot is in progress. */
extern bool box_checkpoint_is_in_progress;
......
......@@ -548,7 +548,8 @@ replicaset_connect(struct applier **appliers, int count,
replicaset_update(appliers, count);
return;
}
say_verbose("connecting to %d replicas", count);
say_info("connecting to %d replicas", count);
/*
* Simultaneously connect to remote peers to receive their UUIDs
......@@ -602,7 +603,7 @@ replicaset_connect(struct applier **appliers, int count,
if (connect_quorum && state.connected < quorum)
goto error;
} else {
say_verbose("connected to %d replicas", state.connected);
say_info("connected to %d replicas", state.connected);
}
for (int i = 0; i < count; i++) {
......@@ -636,13 +637,6 @@ replicaset_connect(struct applier **appliers, int count,
void
replicaset_follow(void)
{
if (replicaset.applier.total == 0) {
/*
* Replication is not configured.
*/
box_clear_orphan();
return;
}
struct replica *replica;
replicaset_foreach(replica) {
/* Resume connected appliers. */
......@@ -653,13 +647,6 @@ replicaset_follow(void)
/* Restart appliers that failed to connect. */
applier_start(replica->applier);
}
if (replicaset_quorum() == 0) {
/*
* Leaving orphan mode immediately since
* replication_connect_quorum is set to 0.
*/
box_clear_orphan();
}
}
void
......@@ -667,10 +654,16 @@ replicaset_sync(void)
{
int quorum = replicaset_quorum();
if (quorum == 0)
if (quorum == 0) {
/*
* Quorum is 0 or replication is not configured.
* Leaving 'orphan' state immediately.
*/
box_set_orphan(false);
return;
}
say_verbose("synchronizing with %d replicas", quorum);
say_info("synchronizing with %d replicas", quorum);
/*
* Wait until all connected replicas synchronize up to
......@@ -691,22 +684,21 @@ replicaset_sync(void)
* Do not stall configuration, leave the instance
* in 'orphan' state.
*/
say_crit("entering orphan mode");
return;
say_crit("failed to synchronize with %d out of %d replicas",
replicaset.applier.total - replicaset.applier.synced,
replicaset.applier.total);
box_set_orphan(true);
} else {
say_info("replica set sync complete");
box_set_orphan(false);
}
say_crit("replica set sync complete, quorum of %d "
"replicas formed", quorum);
}
void
replicaset_check_quorum(void)
{
if (replicaset.applier.synced >= replicaset_quorum()) {
if (replicaset_quorum() > 0)
say_crit("leaving orphan mode");
box_clear_orphan();
}
if (replicaset.applier.synced >= replicaset_quorum())
box_set_orphan(false);
}
void
......
......@@ -3,7 +3,7 @@ core = tarantool
script = master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
release_disabled = catch.test.lua errinj.test.lua gc.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua
release_disabled = catch.test.lua errinj.test.lua gc.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
long_run = prune.test.lua
......
fiber = require('fiber')
---
...
test_run = require('test_run').new()
---
...
engine = test_run:get_cfg('engine')
---
...
box.schema.user.grant('guest', 'replication')
---
...
_ = box.schema.space.create('test', {engine = engine})
---
...
_ = box.space.test:create_index('pk')
---
...
-- Slow down replication a little to test replication_sync_lag.
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.001)
---
- ok
...
-- Helper that adds some records to the space and then starts
-- a fiber to add more records in the background.
test_run:cmd("setopt delimiter ';'")
---
- true
...
count = 0;
---
...
function fill()
for i = count + 1, count + 100 do
box.space.test:replace{i}
end
fiber.create(function()
for i = count + 101, count + 200 do
box.space.test:replace{i}
fiber.sleep(0.0001)
end
end)
fiber.sleep(0.001)
count = count + 200
end;
---
...
test_run:cmd("setopt delimiter ''");
---
- true
...
-- Deploy a replica.
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
---
- true
...
test_run:cmd("start server replica")
---
- true
...
test_run:cmd("switch replica")
---
- true
...
fiber = require('fiber')
---
...
-- Stop replication.
replication = box.cfg.replication
---
...
box.cfg{replication = {}}
---
...
-- Fill the space.
test_run:cmd("switch default")
---
- true
...
fill()
---
...
test_run:cmd("switch replica")
---
- true
...
-- Resume replication.
--
-- Since max allowed lag is small, all recoreds should arrive
-- by the time box.cfg() returns.
--
box.cfg{replication_sync_lag = 0.001}
---
...
box.cfg{replication = replication}
---
...
box.space.test:count()
---
- 200
...
box.info.status -- running
---
- running
...
box.info.ro -- false
---
- false
...
-- Stop replication.
replication = box.cfg.replication
---
...
box.cfg{replication = {}}
---
...
-- Fill the space.
test_run:cmd("switch default")
---
- true
...
fill()
---
...
test_run:cmd("switch replica")
---
- true
...
-- Resume replication
--
-- Since max allowed lag is big, not all records will arrive
-- upon returning from box.cfg() but the instance won't enter
-- orphan state.
--
box.cfg{replication_sync_lag = 1}
---
...
box.cfg{replication = replication}
---
...
box.space.test:count() < 400
---
- true
...
box.info.status -- running
---
- running
...
box.info.ro -- false
---
- false
...
-- Wait for remaining rows to arrive.
repeat fiber.sleep(0.01) until box.space.test:count() == 400
---
...
-- Stop replication.
replication = box.cfg.replication
---
...
box.cfg{replication = {}}
---
...
-- Fill the space.
test_run:cmd("switch default")
---
- true
...
fill()
---
...
test_run:cmd("switch replica")
---
- true
...
-- Resume replication
--
-- Since max allowed lag is big, not all records will arrive
-- upon returning from box.cfg() but the instance won't enter
-- orphan state.
--
box.cfg{replication_sync_lag = 0.001, replication_sync_timeout = 0.001}
---
...
box.cfg{replication = replication}
---
...
box.space.test:count() < 600
---
- true
...
box.info.status -- orphan
---
- orphan
...
box.info.ro -- true
---
- true
...
-- Wait for remaining rows to arrive.
repeat fiber.sleep(0.01) until box.space.test:count() == 600
---
...
-- Make sure replica leaves oprhan state.
repeat fiber.sleep(0.01) until box.info.status ~= 'orphan'
---
...
box.info.status -- running
---
- running
...
box.info.ro -- false
---
- false
...
test_run:cmd("switch default")
---
- true
...
test_run:cmd("stop server replica")
---
- true
...
test_run:cmd("cleanup server replica")
---
- true
...
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
---
- ok
...
box.space.test:drop()
---
...
box.schema.user.revoke('guest', 'replication')
---
...
fiber = require('fiber')
test_run = require('test_run').new()
engine = test_run:get_cfg('engine')
box.schema.user.grant('guest', 'replication')
_ = box.schema.space.create('test', {engine = engine})
_ = box.space.test:create_index('pk')
-- Slow down replication a little to test replication_sync_lag.
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.001)
-- Helper that adds some records to the space and then starts
-- a fiber to add more records in the background.
test_run:cmd("setopt delimiter ';'")
count = 0;
function fill()
for i = count + 1, count + 100 do
box.space.test:replace{i}
end
fiber.create(function()
for i = count + 101, count + 200 do
box.space.test:replace{i}
fiber.sleep(0.0001)
end
end)
fiber.sleep(0.001)
count = count + 200
end;
test_run:cmd("setopt delimiter ''");
-- Deploy a replica.
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
test_run:cmd("start server replica")
test_run:cmd("switch replica")
fiber = require('fiber')
-- Stop replication.
replication = box.cfg.replication
box.cfg{replication = {}}
-- Fill the space.
test_run:cmd("switch default")
fill()
test_run:cmd("switch replica")
-- Resume replication.
--
-- Since max allowed lag is small, all recoreds should arrive
-- by the time box.cfg() returns.
--
box.cfg{replication_sync_lag = 0.001}
box.cfg{replication = replication}
box.space.test:count()
box.info.status -- running
box.info.ro -- false
-- Stop replication.
replication = box.cfg.replication
box.cfg{replication = {}}
-- Fill the space.
test_run:cmd("switch default")
fill()
test_run:cmd("switch replica")
-- Resume replication
--
-- Since max allowed lag is big, not all records will arrive
-- upon returning from box.cfg() but the instance won't enter
-- orphan state.
--
box.cfg{replication_sync_lag = 1}
box.cfg{replication = replication}
box.space.test:count() < 400
box.info.status -- running
box.info.ro -- false
-- Wait for remaining rows to arrive.
repeat fiber.sleep(0.01) until box.space.test:count() == 400
-- Stop replication.
replication = box.cfg.replication
box.cfg{replication = {}}
-- Fill the space.
test_run:cmd("switch default")
fill()
test_run:cmd("switch replica")
-- Resume replication
--
-- Since max allowed lag is big, not all records will arrive
-- upon returning from box.cfg() but the instance won't enter
-- orphan state.
--
box.cfg{replication_sync_lag = 0.001, replication_sync_timeout = 0.001}
box.cfg{replication = replication}
box.space.test:count() < 600
box.info.status -- orphan
box.info.ro -- true
-- Wait for remaining rows to arrive.
repeat fiber.sleep(0.01) until box.space.test:count() == 600
-- Make sure replica leaves oprhan state.
repeat fiber.sleep(0.01) until box.info.status ~= 'orphan'
box.info.status -- running
box.info.ro -- false
test_run:cmd("switch default")
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
box.space.test:drop()
box.schema.user.revoke('guest', 'replication')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment