Skip to content
Snippets Groups Projects
Commit 195d4462 authored by Georgy Kirichenko's avatar Georgy Kirichenko Committed by Konstantin Osipov
Browse files

Send relay heartbeat if wal changes won't be send

If a replica receives some changes then corresponding wal events are
generated and relay fiber is waken up before heartbeat timeout.
But there may be nothing to send if all changes are from the current
relay peer. In this case an applier doesn't receive anything and break a
connection.

Fixes #3160
parent 64faad37
No related branches found
No related tags found
No related merge requests found
...@@ -124,6 +124,8 @@ struct relay { ...@@ -124,6 +124,8 @@ struct relay {
* confirmation from the replica. * confirmation from the replica.
*/ */
struct stailq pending_gc; struct stailq pending_gc;
/** Time when last row was sent to peer. */
double last_row_tm;
struct { struct {
/* Align to prevent false-sharing with tx thread */ /* Align to prevent false-sharing with tx thread */
...@@ -433,13 +435,12 @@ relay_subscribe_f(va_list ap) ...@@ -433,13 +435,12 @@ relay_subscribe_f(va_list ap)
fiber_start(reader, relay, fiber()); fiber_start(reader, relay, fiber());
/* /*
* If the replica happens to be uptodate on subscribe, * If the replica happens to be up to date on subscribe,
* don't wait for timeout to happen - send a heartbeat * don't wait for timeout to happen - send a heartbeat
* message right away to update the replication lag as * message right away to update the replication lag as
* soon as possible. * soon as possible.
*/ */
if (vclock_compare(&r->vclock, &replicaset.vclock) == 0) relay_send_heartbeat(relay);
relay_send_heartbeat(relay);
while (!fiber_is_cancelled()) { while (!fiber_is_cancelled()) {
double timeout = replication_timeout; double timeout = replication_timeout;
...@@ -448,14 +449,8 @@ relay_subscribe_f(va_list ap) ...@@ -448,14 +449,8 @@ relay_subscribe_f(va_list ap)
if (inj != NULL && inj->dparam != 0) if (inj != NULL && inj->dparam != 0)
timeout = inj->dparam; timeout = inj->dparam;
if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) { fiber_cond_wait_deadline(&relay->reader_cond,
/* relay->last_row_tm + timeout);
* Timed out waiting for WAL events.
* Send a heartbeat message to update
* the replication lag on the slave.
*/
relay_send_heartbeat(relay);
}
/* /*
* The fiber can be woken by IO cancel, by a timeout of * The fiber can be woken by IO cancel, by a timeout of
...@@ -463,6 +458,9 @@ relay_subscribe_f(va_list ap) ...@@ -463,6 +458,9 @@ relay_subscribe_f(va_list ap)
* Handle cbus messages first. * Handle cbus messages first.
*/ */
cbus_process(&relay->endpoint); cbus_process(&relay->endpoint);
/* Check for a heartbeat timeout. */
if (ev_monotonic_now(loop()) - relay->last_row_tm > timeout)
relay_send_heartbeat(relay);
/* /*
* Check that the vclock has been updated and the previous * Check that the vclock has been updated and the previous
* status message is delivered * status message is delivered
...@@ -560,6 +558,7 @@ static void ...@@ -560,6 +558,7 @@ static void
relay_send(struct relay *relay, struct xrow_header *packet) relay_send(struct relay *relay, struct xrow_header *packet)
{ {
packet->sync = relay->sync; packet->sync = relay->sync;
relay->last_row_tm = ev_monotonic_now(loop());
coio_write_xrow(&relay->io, packet); coio_write_xrow(&relay->io, packet);
fiber_gc(); fiber_gc();
......
...@@ -61,6 +61,87 @@ test_run:cmd('cleanup server test') ...@@ -61,6 +61,87 @@ test_run:cmd('cleanup server test')
box.cfg{read_only = false} box.cfg{read_only = false}
--- ---
... ...
-- gh-3160 - Send heartbeats if there are changes from a remote master only
SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
---
...
-- Deploy a cluster.
test_run:create_cluster(SERVERS)
---
...
test_run:wait_fullmesh(SERVERS)
---
...
test_run:cmd("switch autobootstrap1")
---
- true
...
test_run = require('test_run').new()
---
...
box.cfg{replication_timeout = 0.01}
---
...
test_run:cmd("switch autobootstrap2")
---
- true
...
test_run = require('test_run').new()
---
...
box.cfg{replication_timeout = 0.01}
---
...
test_run:cmd("switch autobootstrap3")
---
- true
...
test_run = require('test_run').new()
---
...
fiber=require('fiber')
---
...
box.cfg{replication_timeout = 0.01}
---
...
_ = box.schema.space.create('test_timeout'):create_index('pk')
---
...
test_run:cmd("setopt delimiter ';'")
---
- true
...
function test_timeout()
for i = 0, 99 do
box.space.test_timeout:replace({1})
fiber.sleep(0.005)
local rinfo = box.info.replication
if rinfo[1].upstream and rinfo[1].upstream.status ~= 'follow' or
rinfo[2].upstream and rinfo[2].upstream.status ~= 'follow' or
rinfo[3].upstream and rinfo[3].upstream.status ~= 'follow' then
return error('Replication broken')
end
end
return true
end ;
---
...
test_run:cmd("setopt delimiter ''");
---
- true
...
test_timeout()
---
- true
...
test_run:cmd("switch default")
---
- true
...
test_run:drop_cluster(SERVERS)
---
...
box.schema.user.revoke('guest', 'replication') box.schema.user.revoke('guest', 'replication')
--- ---
... ...
...@@ -22,4 +22,40 @@ test_run:cmd('stop server test') ...@@ -22,4 +22,40 @@ test_run:cmd('stop server test')
test_run:cmd('cleanup server test') test_run:cmd('cleanup server test')
box.cfg{read_only = false} box.cfg{read_only = false}
-- gh-3160 - Send heartbeats if there are changes from a remote master only
SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
-- Deploy a cluster.
test_run:create_cluster(SERVERS)
test_run:wait_fullmesh(SERVERS)
test_run:cmd("switch autobootstrap1")
test_run = require('test_run').new()
box.cfg{replication_timeout = 0.01}
test_run:cmd("switch autobootstrap2")
test_run = require('test_run').new()
box.cfg{replication_timeout = 0.01}
test_run:cmd("switch autobootstrap3")
test_run = require('test_run').new()
fiber=require('fiber')
box.cfg{replication_timeout = 0.01}
_ = box.schema.space.create('test_timeout'):create_index('pk')
test_run:cmd("setopt delimiter ';'")
function test_timeout()
for i = 0, 99 do
box.space.test_timeout:replace({1})
fiber.sleep(0.005)
local rinfo = box.info.replication
if rinfo[1].upstream and rinfo[1].upstream.status ~= 'follow' or
rinfo[2].upstream and rinfo[2].upstream.status ~= 'follow' or
rinfo[3].upstream and rinfo[3].upstream.status ~= 'follow' then
return error('Replication broken')
end
end
return true
end ;
test_run:cmd("setopt delimiter ''");
test_timeout()
test_run:cmd("switch default")
test_run:drop_cluster(SERVERS)
box.schema.user.revoke('guest', 'replication') box.schema.user.revoke('guest', 'replication')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment