From 60d457651da7ad36b5485e133ce05265a18935cf Mon Sep 17 00:00:00 2001
From: Serge Petrenko <sergepetrenko@tarantool.org>
Date: Tue, 30 Jan 2024 18:29:00 +0300
Subject: [PATCH] replication: fix ER_PROTOCOL in relay

We've had numerous problems with transaction boundaries in replication.
They were mostly caused by various cases when either the beginning or
end of the transaction happened to be a local row. Local rows are not
replicated, so the peer saw "corrupted" transactions with either no
beginning or no end flag, even though the transaction contents were
fine.

The problem with starting a transaction with a local row was solved in
commit f41d1ddd5faf ("wal: fix tx boundaries"), and that fix seems to
continue working fine to this day.

The problem with ending transactions with a local row was first fixed
in commit 25382617b957 ("replication: append NOP as the last tx row"),
however there were problems with this approach: when a user tried to
write to local spaces on a replica from a replication trigger, it made
it impossible to ever start replicating from replica back to master.

Another fix was proposed: in commit f96782b53214 ("relay: send rows
transactionally") we made relay read a full transaction into memory and
then send it all at once mangling with transanction start and end flags
when necessary.

After that the NOPs were removed in commit f5e52b2c2cf5 ("box: get rid
of dummy NOPs after transactions ending with local rows"), since relay
became capable of fixing transaction boundaries itself.

Turns out the assumption that relay always sees a full transaction and
may correctly set transaction boundaries is wrong: when a replica
reconnects to master we set its starting vclock[0] to the one master has
at the moment of reconnect, so when recovery reads local rows with lsns
less than vclock[0] it silently skips them without showing them to
relay. When such skipped rows contain the is_commit flag for a currently
sent transaction we get the same problem as described before.

Let's make recovery track whether it has pushed any transaction rows to
relay or not, and if yes, recover rows with is_commit flag regardless of
whether the rows were already applied. To prevent recovering the same
data twice, recovery replaces such row contents with NOPs. Basically the
row is "recovered" only for the sake of showing its is_commit flag to
relay. Relay will skip the row anyway, since it remains local.

Follow-up #8958
Closes #9491

NO_DOC=bugfix
---
 .../gh-9491-last-local-row-tx-boundary.md     |  4 ++
 src/box/recovery.cc                           | 35 +++++++---
 .../gh_9491_local_space_tx_boundary_test.lua  | 68 +++++++++++++++++++
 3 files changed, 98 insertions(+), 9 deletions(-)
 create mode 100644 changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md
 create mode 100644 test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua

diff --git a/changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md b/changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md
new file mode 100644
index 0000000000..340149cf2e
--- /dev/null
+++ b/changelogs/unreleased/gh-9491-last-local-row-tx-boundary.md
@@ -0,0 +1,4 @@
+## bugfix/replication
+
+* Fixed a bug when replication broke with `ER_PROTOCOL` when transactions ended
+  with a local space operation (gh-9491).
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 5d7d6b04a3..afa63c9ec1 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -248,6 +248,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 	     const struct vclock *stop_vclock)
 {
 	struct xrow_header row;
+	bool is_sending_tx = false;
 	while (xlog_cursor_next_xc(&r->cursor, &row,
 				   r->wal_dir.force_recovery) == 0) {
 		if (++stream->row_count % WAL_ROWS_PER_YIELD == 0) {
@@ -276,15 +277,31 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		 */
 		assert(row.replica_id != 0 || row.group_id == GROUP_LOCAL);
 		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
-		if (row.lsn <= current_lsn)
-			continue; /* already applied, skip */
-		/*
-		 * We can promote the vclock either before or
-		 * after xstream_write(): it only makes any impact
-		 * in case of forced recovery, when we skip the
-		 * failed row anyway.
-		 */
-		vclock_follow_xrow(&r->vclock, &row);
+		if (row.lsn <= current_lsn) {
+			/*
+			 * Skip the already applied row, if it is not needed to
+			 * preserve transaction boundaries (is not the last row
+			 * of a currently recovered transaction). Otherwise,
+			 * replace it with a NOP, so that the transaction end
+			 * flag reaches the receiver, but the data isn't
+			 * recovered twice.
+			 */
+			if (!is_sending_tx || !row.is_commit)
+				continue; /* already applied, skip */
+			row.type = IPROTO_NOP;
+			row.bodycnt = 0;
+			row.body[0].iov_base = NULL;
+			row.body[0].iov_len = 0;
+		} else {
+			/*
+			 * We can promote the vclock either before or
+			 * after xstream_write(): it only makes any impact
+			 * in case of forced recovery, when we skip the
+			 * failed row anyway.
+			 */
+			vclock_follow_xrow(&r->vclock, &row);
+		}
+		is_sending_tx = !row.is_commit;
 		if (xstream_write(stream, &row) != 0) {
 			if (!r->wal_dir.force_recovery)
 				diag_raise();
diff --git a/test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua b/test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua
new file mode 100644
index 0000000000..77270e9605
--- /dev/null
+++ b/test/replication-luatest/gh_9491_local_space_tx_boundary_test.lua
@@ -0,0 +1,68 @@
+local t = require('luatest')
+local server = require('luatest.server')
+local replica_set = require('luatest.replica_set')
+
+local g = t.group()
+
+g.before_each(function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.box_cfg = {
+        replication = {
+            server.build_listen_uri('server1', cg.replica_set.id),
+            server.build_listen_uri('server2', cg.replica_set.id),
+        },
+        replication_timeout = 0.1,
+    }
+    cg.servers = {}
+    for i = 1, 2 do
+        cg.servers[i] = cg.replica_set:build_and_add_server{
+            alias = 'server' .. i,
+            box_cfg = cg.box_cfg,
+        }
+    end
+    cg.replica_set:start()
+    cg.replica_set:wait_for_fullmesh()
+    cg.servers[1]:exec(function()
+        box.schema.space.create('test')
+        box.space.test:create_index('pk')
+        box.schema.space.create('loc', {is_local = true})
+        box.space.loc:create_index('pk')
+    end)
+end)
+
+g.after_each(function(cg)
+    cg.replica_set:drop()
+end)
+
+--
+-- gh-9491: make sure that transactions ending with a local row are correctly
+-- recovered and replicated by relay.
+--
+g.test_local_row_tx_boundary = function(cg)
+    -- Stop replication, write some transactions to WAL, then restart
+    -- replication.
+    cg.servers[2]:update_box_cfg{replication = ""}
+    cg.servers[1]:exec(function()
+        box.begin()
+        box.space.test:replace{1}
+        box.space.loc:replace{1}
+        box.commit()
+        box.begin()
+        box.space.test:replace{2}
+        box.space.loc:replace{2}
+        box.commit()
+    end)
+    cg.servers[2]:update_box_cfg{replication = cg.box_cfg.replication}
+    t.helpers.retrying({}, function()
+        cg.servers[2]:assert_follows_upstream(cg.servers[1]:get_instance_id())
+        cg.servers[1]:wait_for_downstream_to(cg.servers[2])
+    end)
+    cg.servers[1]:exec(function(servers2_id)
+        t.assert_equals(box.info.replication[servers2_id].downstream.status,
+                        'follow')
+    end, {cg.servers[2]:get_instance_id()})
+    cg.servers[2]:exec(function()
+        t.assert_equals(box.space.test:select{}, {{1}, {2}})
+        t.assert_equals(box.space.loc:select{}, {})
+    end)
+end
-- 
GitLab