diff --git a/src/box/relay.cc b/src/box/relay.cc index 4c550c6b3c65aed1d27fb8b4a0692c67f06863c6..813fca94158c75f59e835cc1724037b4cca3ea39 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -552,7 +552,7 @@ relay_final_join(struct replica *replica, struct iostream *io, uint64_t sync, /* * Save the first vclock as 'received'. Because it was really received. */ - vclock_copy(&relay->last_recv_ack.vclock, start_vclock); + vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock); relay->r = recovery_new(wal_dir(), false, start_vclock); vclock_copy(&relay->stop_vclock, stop_vclock); @@ -780,6 +780,7 @@ relay_reader_f(va_list ap) while (!fiber_is_cancelled()) { FiberGCChecker gc_check; struct xrow_header xrow; + ERROR_INJECT_YIELD(ERRINJ_RELAY_READ_ACK_DELAY); coio_read_xrow_timeout_xc(relay->io, &ibuf, &xrow, replication_disconnect_timeout()); xrow_decode_applier_heartbeat_xc(&xrow, last_recv_ack); @@ -1113,11 +1114,10 @@ relay_subscribe(struct replica *replica, struct iostream *io, uint64_t sync, /* * Save the first vclock as 'received'. Because it was really received. */ - vclock_copy(&relay->last_recv_ack.vclock, start_vclock); + vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock); relay->r = recovery_new(wal_dir(), false, start_vclock); - vclock_copy(&relay->tx.vclock, start_vclock); + vclock_copy_ignore0(&relay->tx.vclock, start_vclock); relay->version_id = replica_version_id; - relay->id_filter |= replica_id_filter; struct cord cord; diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index b277159754d8c89f62f8f8954e912b375b621226..7783106975df0e69de0ce685f99cc7390a997aec 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -121,6 +121,7 @@ struct errinj { _(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \ _(ERRINJ_RELAY_WAL_START_DELAY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_RELAY_READ_ACK_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_REPLICASET_VCLOCK, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_SIGILL_MAIN_THREAD, ERRINJ_BOOL, {.bparam = false}) \ diff --git a/src/lib/vclock/vclock.h b/src/lib/vclock/vclock.h index 6034259b3f0837ec63c0ecaf278a4f6cdb8305e3..85aa011beecdd272ae6ca412dfd2d367928930b5 100644 --- a/src/lib/vclock/vclock.h +++ b/src/lib/vclock/vclock.h @@ -202,6 +202,13 @@ vclock_copy(struct vclock *dst, const struct vclock *src) sizeof(*dst->lsn) * max_pos); } +static inline void +vclock_copy_ignore0(struct vclock *dst, const struct vclock *src) +{ + vclock_copy(dst, src); + vclock_reset(dst, 0, 0); +} + static inline uint32_t vclock_size(const struct vclock *vclock) { diff --git a/test/box/errinj.result b/test/box/errinj.result index e31d2317c7673df2da67de3730df123555ab3c2d..df5a3f06f253d294158288a44d00a2a1b86f5d9a 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -90,6 +90,7 @@ evals - ERRINJ_RELAY_FINAL_JOIN: false - ERRINJ_RELAY_FINAL_SLEEP: false - ERRINJ_RELAY_FROM_TX_DELAY: false + - ERRINJ_RELAY_READ_ACK_DELAY: false - ERRINJ_RELAY_REPORT_INTERVAL: 0 - ERRINJ_RELAY_SEND_DELAY: false - ERRINJ_RELAY_TIMEOUT: 0 diff --git a/test/replication-luatest/gh_10047_local_vclock_downstream_test.lua b/test/replication-luatest/gh_10047_local_vclock_downstream_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..c796b0d968286cc1964fbe45265d52aea75c07dd --- /dev/null +++ b/test/replication-luatest/gh_10047_local_vclock_downstream_test.lua @@ -0,0 +1,99 @@ +local t = require('luatest') +local server = require('luatest.server') +local replica_set = require('luatest.replica_set') + +local g = t.group('gh_10047') +local wait_timeout = 10 + +g.before_all(function(cg) + t.tarantool.skip_if_not_debug() + cg.replica_set = replica_set:new({}) + cg.replication = { + server.build_listen_uri('server1', cg.replica_set.id), + } + cg.server1 = cg.replica_set:build_and_add_server{ + alias = 'server1', + box_cfg = { + replication_timeout = 0.1, + }, + } + cg.server2 = cg.replica_set:build_and_add_server{ + alias = 'server2', + box_cfg = { + replication_timeout = 0.1, + replication = server.build_listen_uri('server1', cg.replica_set.id), + }, + } + cg.replica_set:start() + cg.server1:exec(function() + local s1 = box.schema.create_space('test') + s1:create_index('pk') + local s2 = box.schema.create_space('test_loc', {is_local = true}) + s2:create_index('pk') + end) +end) + +g.after_all(function(cg) + cg.replica_set:drop() +end) + +-- +-- gh-10047: relay used to save local vclock[0] as the last received ACK from +-- the replica which just subscribed and didn't send any real ACKs. When a real +-- ACK was received, it didn't have vclock[0] and it looked like vclock[0] went +-- backwards. That broke an assert in relay. +-- +g.test_downstream_vclock_no_local = function(cg) + -- Make sure there is a local row on master and vclock isn't empty. + cg.server1:exec(function() + box.space.test:replace{1} + box.space.test_loc:replace{1} + end) + cg.server2:wait_for_vclock_of(cg.server1) + local server2_id = cg.server2:get_instance_id() + cg.server2:stop() + cg.server1:exec(function() + -- On restart the replica's ACKs are not received for a while. Need to + -- catch the moment when the subscribe vclock appears in + -- info.replication and it isn't yet overridden by a real ACK. + box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", true) + -- While the replica is away, the master moves a bit. To make replica's + -- vclock smaller so as it would receive actual data after subscribe and + -- send a real ACK (not just an empty heartbeat). + box.space.test:replace{2} + end) + cg.server2:start() + cg.server1:exec(function(id, timeout) + local fiber = require('fiber') + -- Wait until subscribe is done. + t.helpers.retrying({timeout = timeout}, function() + local info = box.info.replication[id] + if info and info.downstream and info.downstream.vclock and + next(info.downstream.vclock) then + t.assert_equals(info.downstream.vclock[0], nil) + return + end + error("No subscribe from the replica") + end) + -- When subscribe is just finished, relay has subscribe vclock saved as + -- last-ACK. But the crash was triggered when before-last-ACK was >= + -- last-ACK. The latter becomes the former when relay exchanges status + -- messages with TX thread. + -- + -- Hence need to wait until the TX thread gets notified by the relay + -- about anything. + t.helpers.retrying({timeout = timeout}, function() + local ack_period = box.cfg.replication_timeout + fiber.sleep(ack_period) + local info = box.info.replication[id] + if info and info.downstream and info.downstream.idle and + info.downstream.idle < ack_period then + return + end + error("No report to TX thread") + end) + -- Receive a real ACK. It must be >= subscribe vclock or master dies. + box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", false) + end, {server2_id, wait_timeout}) + cg.server2:wait_for_vclock_of(cg.server1) +end