diff --git a/src/box/box.cc b/src/box/box.cc index 566cb41efa3c194f3b5083fcb919117a58b589ad..18568df3bf5c11a3462a5ebfb36df3b14840db21 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -152,6 +152,11 @@ static struct fiber_pool tx_fiber_pool; * are too many messages in flight (gh-1892). */ static struct cbus_endpoint tx_prio_endpoint; +/** + * A trigger executed each time the Raft state machine updates any + * of its visible attributes. + */ +static struct trigger box_raft_on_update; void box_update_ro_summary(void) @@ -1005,7 +1010,7 @@ box_set_replication_anon(void) } void -box_clear_synchro_queue(void) +box_clear_synchro_queue(bool try_wait) { if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) return; @@ -1014,13 +1019,15 @@ box_clear_synchro_queue(void) if (former_leader_id == instance_id) return; - /* Wait until pending confirmations/rollbacks reach us. */ - double timeout = 2 * replication_synchro_timeout; - double start_tm = fiber_clock(); - while (!txn_limbo_is_empty(&txn_limbo)) { - if (fiber_clock() - start_tm > timeout) - break; - fiber_sleep(0.001); + if (try_wait) { + /* Wait until pending confirmations/rollbacks reach us. */ + double timeout = 2 * replication_synchro_timeout; + double start_tm = fiber_clock(); + while (!txn_limbo_is_empty(&txn_limbo)) { + if (fiber_clock() - start_tm > timeout) + break; + fiber_sleep(0.001); + } } if (!txn_limbo_is_empty(&txn_limbo)) { @@ -1053,6 +1060,22 @@ box_clear_synchro_queue(void) } } +static int +box_raft_on_update_f(struct trigger *trigger, void *event) +{ + (void)trigger; + (void)event; + if (raft.state != RAFT_STATE_LEADER) + return 0; + /* + * When the node became a leader, it means it will ignore all records + * from all the other nodes, and won't get late CONFIRM messages anyway. + * Can clear the queue without waiting for confirmations. + */ + box_clear_synchro_queue(false); + return 0; +} + void box_listen(void) { @@ -2633,6 +2656,9 @@ box_init(void) txn_limbo_init(); sequence_init(); raft_init(); + + trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL); + raft_on_update(&box_raft_on_update); } bool diff --git a/src/box/box.h b/src/box/box.h index a151fb8f1d281501193c1aab614172ef6d2a0afd..b47a220b7d225b4fb6f3e872057ce5eddfdd21f1 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -266,7 +266,8 @@ extern "C" { typedef struct tuple box_tuple_t; -void box_clear_synchro_queue(void); +void +box_clear_synchro_queue(bool try_wait); /* box_select is private and used only by FFI */ API_EXPORT int diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index 2017ddc183e86fb2a041ba3a152bcea6d18ce1bc..bf26465e60f3fcaa14b7638a15fbadb36503a732 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -82,7 +82,7 @@ static int lbox_ctl_clear_synchro_queue(struct lua_State *L) { (void) L; - box_clear_synchro_queue(); + box_clear_synchro_queue(true); return 0; } diff --git a/test/replication/election_qsync.result b/test/replication/election_qsync.result new file mode 100644 index 0000000000000000000000000000000000000000..086b1768698f6073e97a1e7113db082210ec81cb --- /dev/null +++ b/test/replication/election_qsync.result @@ -0,0 +1,154 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +box.schema.user.grant('guest', 'super') + | --- + | ... + +old_election_mode = box.cfg.election_mode + | --- + | ... +old_replication_synchro_timeout = box.cfg.replication_synchro_timeout + | --- + | ... +old_replication_timeout = box.cfg.replication_timeout + | --- + | ... +old_replication = box.cfg.replication + | --- + | ... + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') + | --- + | - true + | ... +test_run:cmd('start server replica with wait=True, wait_load=True') + | --- + | - true + | ... +-- Any election activities require fullmesh. +box.cfg{replication = test_run:eval('replica', 'box.cfg.listen')[1]} + | --- + | ... + +-- +-- gh-5339: leader election manages transaction limbo automatically. +-- +-- Idea of the test is that there are 2 nodes. A leader and a +-- follower. The leader creates a synchronous transaction, it gets +-- replicated to the follower, the leader dies. Now when the +-- follower is elected as a new leader, it should finish the +-- pending transaction. +-- +_ = box.schema.create_space('test', {is_sync = true}) + | --- + | ... +_ = _:create_index('pk') + | --- + | ... +box.cfg{election_mode = 'voter'} + | --- + | ... + +test_run:switch('replica') + | --- + | - true + | ... +fiber = require('fiber') + | --- + | ... +-- Replication timeout is small to speed up a first election start. +box.cfg{ \ + election_mode = 'candidate', \ + replication_synchro_quorum = 3, \ + replication_synchro_timeout = 1000000, \ + replication_timeout = 0.1, \ +} + | --- + | ... + +test_run:wait_cond(function() return box.info.election.state == 'leader' end) + | --- + | - true + | ... +lsn = box.info.lsn + | --- + | ... +_ = fiber.create(function() \ + ok, err = pcall(box.space.test.replace, box.space.test, {1}) \ +end) + | --- + | ... +-- Wait WAL write. +test_run:wait_cond(function() return box.info.lsn > lsn end) + | --- + | - true + | ... +-- Wait replication to the other instance. +test_run:wait_lsn('default', 'replica') + | --- + | ... + +test_run:switch('default') + | --- + | - true + | ... +test_run:cmd('stop server replica') + | --- + | - true + | ... +-- Will fail - the node is not a leader. +box.space.test:replace{2} + | --- + | - error: Can't modify data because this instance is in read-only mode. + | ... + +-- Set synchro timeout to a huge value to ensure, that when a leader is elected, +-- it won't wait for this timeout. +box.cfg{replication_synchro_timeout = 1000000} + | --- + | ... + +-- Configure separately from synchro timeout not to depend on the order of +-- synchro and election options appliance. Replication timeout is tiny to speed +-- up notice of the old leader death. +box.cfg{ \ + election_mode = 'candidate', \ + replication_timeout = 0.01, \ +} + | --- + | ... + +test_run:wait_cond(function() return box.info.election.state == 'leader' end) + | --- + | - true + | ... +_ = box.space.test:replace{2} + | --- + | ... +box.space.test:select{} + | --- + | - - [1] + | - [2] + | ... +box.space.test:drop() + | --- + | ... + +test_run:cmd('delete server replica') + | --- + | - true + | ... +box.cfg{ \ + election_mode = old_election_mode, \ + replication_timeout = old_replication_timeout, \ + replication = old_replication, \ + replication_synchro_timeout = old_replication_synchro_timeout, \ +} + | --- + | ... +box.schema.user.revoke('guest', 'super') + | --- + | ... diff --git a/test/replication/election_qsync.test.lua b/test/replication/election_qsync.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..6a80f48595ecd41b0a1c8d454087f51129bdb2f6 --- /dev/null +++ b/test/replication/election_qsync.test.lua @@ -0,0 +1,77 @@ +test_run = require('test_run').new() +box.schema.user.grant('guest', 'super') + +old_election_mode = box.cfg.election_mode +old_replication_synchro_timeout = box.cfg.replication_synchro_timeout +old_replication_timeout = box.cfg.replication_timeout +old_replication = box.cfg.replication + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') +test_run:cmd('start server replica with wait=True, wait_load=True') +-- Any election activities require fullmesh. +box.cfg{replication = test_run:eval('replica', 'box.cfg.listen')[1]} + +-- +-- gh-5339: leader election manages transaction limbo automatically. +-- +-- Idea of the test is that there are 2 nodes. A leader and a +-- follower. The leader creates a synchronous transaction, it gets +-- replicated to the follower, the leader dies. Now when the +-- follower is elected as a new leader, it should finish the +-- pending transaction. +-- +_ = box.schema.create_space('test', {is_sync = true}) +_ = _:create_index('pk') +box.cfg{election_mode = 'voter'} + +test_run:switch('replica') +fiber = require('fiber') +-- Replication timeout is small to speed up a first election start. +box.cfg{ \ + election_mode = 'candidate', \ + replication_synchro_quorum = 3, \ + replication_synchro_timeout = 1000000, \ + replication_timeout = 0.1, \ +} + +test_run:wait_cond(function() return box.info.election.state == 'leader' end) +lsn = box.info.lsn +_ = fiber.create(function() \ + ok, err = pcall(box.space.test.replace, box.space.test, {1}) \ +end) +-- Wait WAL write. +test_run:wait_cond(function() return box.info.lsn > lsn end) +-- Wait replication to the other instance. +test_run:wait_lsn('default', 'replica') + +test_run:switch('default') +test_run:cmd('stop server replica') +-- Will fail - the node is not a leader. +box.space.test:replace{2} + +-- Set synchro timeout to a huge value to ensure, that when a leader is elected, +-- it won't wait for this timeout. +box.cfg{replication_synchro_timeout = 1000000} + +-- Configure separately from synchro timeout not to depend on the order of +-- synchro and election options appliance. Replication timeout is tiny to speed +-- up notice of the old leader death. +box.cfg{ \ + election_mode = 'candidate', \ + replication_timeout = 0.01, \ +} + +test_run:wait_cond(function() return box.info.election.state == 'leader' end) +_ = box.space.test:replace{2} +box.space.test:select{} +box.space.test:drop() + +test_run:cmd('delete server replica') +box.cfg{ \ + election_mode = old_election_mode, \ + replication_timeout = old_replication_timeout, \ + replication = old_replication, \ + replication_synchro_timeout = old_replication_synchro_timeout, \ +} +box.schema.user.revoke('guest', 'super') diff --git a/test/replication/election_qsync_stress.result b/test/replication/election_qsync_stress.result index 9497b37bf105c1ab247fee4be44205500ec3fa98..bb419c3bf17bdfa160c9e53daee87136b2981c56 100644 --- a/test/replication/election_qsync_stress.result +++ b/test/replication/election_qsync_stress.result @@ -93,7 +93,6 @@ for i = 1,10 do new_leader = 'election_replica'..new_leader_nr leader_port = test_run:eval(new_leader, 'box.cfg.listen')[1] c = netbox.connect(leader_port) - c:eval('box.ctl.clear_synchro_queue()') c:eval('box.cfg{replication_synchro_timeout=1000}') c.space._schema:replace{'smth'} c.space.test:get{i} diff --git a/test/replication/election_qsync_stress.test.lua b/test/replication/election_qsync_stress.test.lua index bca1b20c72c3fb50f9ba9083349a6449555daa52..2f379efdb9db409a3264dfc3daee5efabd2e89ee 100644 --- a/test/replication/election_qsync_stress.test.lua +++ b/test/replication/election_qsync_stress.test.lua @@ -57,7 +57,6 @@ for i = 1,10 do new_leader = 'election_replica'..new_leader_nr leader_port = test_run:eval(new_leader, 'box.cfg.listen')[1] c = netbox.connect(leader_port) - c:eval('box.ctl.clear_synchro_queue()') c:eval('box.cfg{replication_synchro_timeout=1000}') c.space._schema:replace{'smth'} c.space.test:get{i}