diff --git a/changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md b/changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md new file mode 100644 index 0000000000000000000000000000000000000000..340149cf2ed785658b7144c43edd1d5fc6588196 --- /dev/null +++ b/changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md @@ -0,0 +1,4 @@ +## bugfix/replication + +* Fixed a bug when replication broke with `ER_PROTOCOL` when transactions ended + with a local space operation (gh-9491). diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 5d7d6b04a3181f3c77ed9c55fed07b3f37afa642..afa63c9ec143972a940c5788cb7f263f6a94b382 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -248,6 +248,7 @@ recover_xlog(struct recovery *r, struct xstream *stream, const struct vclock *stop_vclock) { struct xrow_header row; + bool is_sending_tx = false; while (xlog_cursor_next_xc(&r->cursor, &row, r->wal_dir.force_recovery) == 0) { if (++stream->row_count % WAL_ROWS_PER_YIELD == 0) { @@ -276,15 +277,31 @@ recover_xlog(struct recovery *r, struct xstream *stream, */ assert(row.replica_id != 0 || row.group_id == GROUP_LOCAL); int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); - if (row.lsn <= current_lsn) - continue; /* already applied, skip */ - /* - * We can promote the vclock either before or - * after xstream_write(): it only makes any impact - * in case of forced recovery, when we skip the - * failed row anyway. - */ - vclock_follow_xrow(&r->vclock, &row); + if (row.lsn <= current_lsn) { + /* + * Skip the already applied row, if it is not needed to + * preserve transaction boundaries (is not the last row + * of a currently recovered transaction). Otherwise, + * replace it with a NOP, so that the transaction end + * flag reaches the receiver, but the data isn't + * recovered twice. + */ + if (!is_sending_tx || !row.is_commit) + continue; /* already applied, skip */ + row.type = IPROTO_NOP; + row.bodycnt = 0; + row.body[0].iov_base = NULL; + row.body[0].iov_len = 0; + } else { + /* + * We can promote the vclock either before or + * after xstream_write(): it only makes any impact + * in case of forced recovery, when we skip the + * failed row anyway. + */ + vclock_follow_xrow(&r->vclock, &row); + } + is_sending_tx = !row.is_commit; if (xstream_write(stream, &row) != 0) { if (!r->wal_dir.force_recovery) diag_raise(); diff --git a/test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua b/test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..77270e96059dcb7aca2acb2eaee0f031803e6fa6 --- /dev/null +++ b/test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua @@ -0,0 +1,68 @@ +local t = require('luatest') +local server = require('luatest.server') +local replica_set = require('luatest.replica_set') + +local g = t.group() + +g.before_each(function(cg) + cg.replica_set = replica_set:new{} + cg.box_cfg = { + replication = { + server.build_listen_uri('server1', cg.replica_set.id), + server.build_listen_uri('server2', cg.replica_set.id), + }, + replication_timeout = 0.1, + } + cg.servers = {} + for i = 1, 2 do + cg.servers[i] = cg.replica_set:build_and_add_server{ + alias = 'server' .. i, + box_cfg = cg.box_cfg, + } + end + cg.replica_set:start() + cg.replica_set:wait_for_fullmesh() + cg.servers[1]:exec(function() + box.schema.space.create('test') + box.space.test:create_index('pk') + box.schema.space.create('loc', {is_local = true}) + box.space.loc:create_index('pk') + end) +end) + +g.after_each(function(cg) + cg.replica_set:drop() +end) + +-- +-- gh-9491: make sure that transactions ending with a local row are correctly +-- recovered and replicated by relay. +-- +g.test_local_row_tx_boundary = function(cg) + -- Stop replication, write some transactions to WAL, then restart + -- replication. + cg.servers[2]:update_box_cfg{replication = ""} + cg.servers[1]:exec(function() + box.begin() + box.space.test:replace{1} + box.space.loc:replace{1} + box.commit() + box.begin() + box.space.test:replace{2} + box.space.loc:replace{2} + box.commit() + end) + cg.servers[2]:update_box_cfg{replication = cg.box_cfg.replication} + t.helpers.retrying({}, function() + cg.servers[2]:assert_follows_upstream(cg.servers[1]:get_instance_id()) + cg.servers[1]:wait_for_downstream_to(cg.servers[2]) + end) + cg.servers[1]:exec(function(servers2_id) + t.assert_equals(box.info.replication[servers2_id].downstream.status, + 'follow') + end, {cg.servers[2]:get_instance_id()}) + cg.servers[2]:exec(function() + t.assert_equals(box.space.test:select{}, {{1}, {2}}) + t.assert_equals(box.space.loc:select{}, {}) + end) +end