From 1f75231a0c0ce062e08660e0bc51485f7086412c Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Date: Wed, 29 May 2024 19:24:22 +0200 Subject: [PATCH] relay: do not report vclock[0] anywhere Remote replica's vclock is given to master to send data starting from that position. The master does that, but, in order to find the relevant position in local WAL to start from, the master must ignore the local rows. Consider them all already "sent". For that the master replaces the remote vclock[0] with the local vclock[0]. That makes xlog cursor skip all the local rows. The problem is that this vclock was taken by relay as is, like if it was truly reported by the replica. It was even saved as the "last received ACK". Which clearly isn't the case. When a real ACK was received, it didn't contain anything in vclock[0], and yet relay "saw" that the previous ACK has vclock[0] > 0. That looked like the replica went backwards without even closing connection, which isn't possible. That made the relay crash from cringe (on assert). The fix is not to save the local vclock[0] in the last received ACK. For GC and xlog cursor the hack is still needed. An option how to make it easier was to set vclock[0] to INT64_MAX to just never even bother with any local rows, but that didn't work. Some assumptions in other places seem to depend on having a proper local LSN in these places. Closes #10047 NO_CHANGELOG=the bug wasn't released NO_DOC=bugfix --- src/box/relay.cc | 8 +- src/lib/core/errinj.h | 1 + src/lib/vclock/vclock.h | 7 ++ test/box/errinj.result | 1 + .../gh_10047_local_vclock_downstream_test.lua | 99 +++++++++++++++++++ 5 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 test/replication-luatest/gh_10047_local_vclock_downstream_test.lua diff --git a/src/box/relay.cc b/src/box/relay.cc index 4c550c6b3c..813fca9415 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -552,7 +552,7 @@ relay_final_join(struct replica *replica, struct iostream *io, uint64_t sync, /* * Save the first vclock as 'received'. Because it was really received. */ - vclock_copy(&relay->last_recv_ack.vclock, start_vclock); + vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock); relay->r = recovery_new(wal_dir(), false, start_vclock); vclock_copy(&relay->stop_vclock, stop_vclock); @@ -780,6 +780,7 @@ relay_reader_f(va_list ap) while (!fiber_is_cancelled()) { FiberGCChecker gc_check; struct xrow_header xrow; + ERROR_INJECT_YIELD(ERRINJ_RELAY_READ_ACK_DELAY); coio_read_xrow_timeout_xc(relay->io, &ibuf, &xrow, replication_disconnect_timeout()); xrow_decode_applier_heartbeat_xc(&xrow, last_recv_ack); @@ -1113,11 +1114,10 @@ relay_subscribe(struct replica *replica, struct iostream *io, uint64_t sync, /* * Save the first vclock as 'received'. Because it was really received. */ - vclock_copy(&relay->last_recv_ack.vclock, start_vclock); + vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock); relay->r = recovery_new(wal_dir(), false, start_vclock); - vclock_copy(&relay->tx.vclock, start_vclock); + vclock_copy_ignore0(&relay->tx.vclock, start_vclock); relay->version_id = replica_version_id; - relay->id_filter |= replica_id_filter; struct cord cord; diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index b277159754..7783106975 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -121,6 +121,7 @@ struct errinj { _(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \ _(ERRINJ_RELAY_WAL_START_DELAY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_RELAY_READ_ACK_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_REPLICASET_VCLOCK, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_SIGILL_MAIN_THREAD, ERRINJ_BOOL, {.bparam = false}) \ diff --git a/src/lib/vclock/vclock.h b/src/lib/vclock/vclock.h index 6034259b3f..85aa011bee 100644 --- a/src/lib/vclock/vclock.h +++ b/src/lib/vclock/vclock.h @@ -202,6 +202,13 @@ vclock_copy(struct vclock *dst, const struct vclock *src) sizeof(*dst->lsn) * max_pos); } +static inline void +vclock_copy_ignore0(struct vclock *dst, const struct vclock *src) +{ + vclock_copy(dst, src); + vclock_reset(dst, 0, 0); +} + static inline uint32_t vclock_size(const struct vclock *vclock) { diff --git a/test/box/errinj.result b/test/box/errinj.result index e31d2317c7..df5a3f06f2 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -90,6 +90,7 @@ evals - ERRINJ_RELAY_FINAL_JOIN: false - ERRINJ_RELAY_FINAL_SLEEP: false - ERRINJ_RELAY_FROM_TX_DELAY: false + - ERRINJ_RELAY_READ_ACK_DELAY: false - ERRINJ_RELAY_REPORT_INTERVAL: 0 - ERRINJ_RELAY_SEND_DELAY: false - ERRINJ_RELAY_TIMEOUT: 0 diff --git a/test/replication-luatest/gh_10047_local_vclock_downstream_test.lua b/test/replication-luatest/gh_10047_local_vclock_downstream_test.lua new file mode 100644 index 0000000000..c796b0d968 --- /dev/null +++ b/test/replication-luatest/gh_10047_local_vclock_downstream_test.lua @@ -0,0 +1,99 @@ +local t = require('luatest') +local server = require('luatest.server') +local replica_set = require('luatest.replica_set') + +local g = t.group('gh_10047') +local wait_timeout = 10 + +g.before_all(function(cg) + t.tarantool.skip_if_not_debug() + cg.replica_set = replica_set:new({}) + cg.replication = { + server.build_listen_uri('server1', cg.replica_set.id), + } + cg.server1 = cg.replica_set:build_and_add_server{ + alias = 'server1', + box_cfg = { + replication_timeout = 0.1, + }, + } + cg.server2 = cg.replica_set:build_and_add_server{ + alias = 'server2', + box_cfg = { + replication_timeout = 0.1, + replication = server.build_listen_uri('server1', cg.replica_set.id), + }, + } + cg.replica_set:start() + cg.server1:exec(function() + local s1 = box.schema.create_space('test') + s1:create_index('pk') + local s2 = box.schema.create_space('test_loc', {is_local = true}) + s2:create_index('pk') + end) +end) + +g.after_all(function(cg) + cg.replica_set:drop() +end) + +-- +-- gh-10047: relay used to save local vclock[0] as the last received ACK from +-- the replica which just subscribed and didn't send any real ACKs. When a real +-- ACK was received, it didn't have vclock[0] and it looked like vclock[0] went +-- backwards. That broke an assert in relay. +-- +g.test_downstream_vclock_no_local = function(cg) + -- Make sure there is a local row on master and vclock isn't empty. + cg.server1:exec(function() + box.space.test:replace{1} + box.space.test_loc:replace{1} + end) + cg.server2:wait_for_vclock_of(cg.server1) + local server2_id = cg.server2:get_instance_id() + cg.server2:stop() + cg.server1:exec(function() + -- On restart the replica's ACKs are not received for a while. Need to + -- catch the moment when the subscribe vclock appears in + -- info.replication and it isn't yet overridden by a real ACK. + box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", true) + -- While the replica is away, the master moves a bit. To make replica's + -- vclock smaller so as it would receive actual data after subscribe and + -- send a real ACK (not just an empty heartbeat). + box.space.test:replace{2} + end) + cg.server2:start() + cg.server1:exec(function(id, timeout) + local fiber = require('fiber') + -- Wait until subscribe is done. + t.helpers.retrying({timeout = timeout}, function() + local info = box.info.replication[id] + if info and info.downstream and info.downstream.vclock and + next(info.downstream.vclock) then + t.assert_equals(info.downstream.vclock[0], nil) + return + end + error("No subscribe from the replica") + end) + -- When subscribe is just finished, relay has subscribe vclock saved as + -- last-ACK. But the crash was triggered when before-last-ACK was >= + -- last-ACK. The latter becomes the former when relay exchanges status + -- messages with TX thread. + -- + -- Hence need to wait until the TX thread gets notified by the relay + -- about anything. + t.helpers.retrying({timeout = timeout}, function() + local ack_period = box.cfg.replication_timeout + fiber.sleep(ack_period) + local info = box.info.replication[id] + if info and info.downstream and info.downstream.idle and + info.downstream.idle < ack_period then + return + end + error("No report to TX thread") + end) + -- Receive a real ACK. It must be >= subscribe vclock or master dies. + box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", false) + end, {server2_id, wait_timeout}) + cg.server2:wait_for_vclock_of(cg.server1) +end -- GitLab