From 5b969d946e7dfdc8f08eb04c59e510bcfbaeaa94 Mon Sep 17 00:00:00 2001
From: Nikita Zheleztsov <n.zheleztsov@proton.me>
Date: Fri, 16 Jun 2023 20:03:54 +0300
Subject: [PATCH] limbo: fix commit/rollback failures with triggers

Currently some transactions on synchronous space fail to complete with
the `ER_CURSOR_NO_TRANSACTION` error, when on_rollback/on_commit triggers
are set.

This is caused due to the fact, that some rollback/commit triggers
require in_txn fiber variable to be set but it's not done when a
transaction is completed from the limbo. Callbacks, which are used to
work with iterators (`lbox_txn_pairs` and `lbox_txn_iterator_next`),
acquire tnx statements from the current transactions, but they cannot
do that, when this transaction is not assigned to the current fiber, so
`ER_CURSOR_NO_TRANSACTION` is thrown.

Let's assign in_txn variable when we complete transaction from the limbo.
Moreover, let's add assertions, which check whether in_txn() is correct,
in order to be sure, that `txn_complete_success/fail` always run with
in_txn set.

Closes #8505

NO_DOC=bugfix

(cherry picked from commit 6fadc8a0812650afcd46bdffd63ae13c650b252a)
---
 .../gh-8505-synchro-triggers-fail.md          |  5 ++
 src/box/txn.c                                 |  2 +
 src/box/txn_limbo.c                           | 22 ++++-
 .../gh_8505_synchro_triggers_test.lua         | 80 +++++++++++++++++++
 4 files changed, 106 insertions(+), 3 deletions(-)
 create mode 100644 changelogs/unreleased/gh-8505-synchro-triggers-fail.md
 create mode 100644 test/replication-luatest/gh_8505_synchro_triggers_test.lua

diff --git a/changelogs/unreleased/gh-8505-synchro-triggers-fail.md b/changelogs/unreleased/gh-8505-synchro-triggers-fail.md
new file mode 100644
index 0000000000..9581aa8a46
--- /dev/null
+++ b/changelogs/unreleased/gh-8505-synchro-triggers-fail.md
@@ -0,0 +1,5 @@
+## bugfix/core
+
+* Fixed a bug causing the `ER_CURSOR_NO_TRANSACTION` failure for transactions
+  on synchronous spaces when the `on_commit/on_rollback` triggers are set
+  (gh-8505).
diff --git a/src/box/txn.c b/src/box/txn.c
index b26b599bd6..5d7e7c1fef 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -755,6 +755,7 @@ txn_complete_fail(struct txn *txn)
 	assert(!txn_has_flag(txn, TXN_IS_DONE));
 	assert(txn->signature < 0);
 	assert(txn->signature != TXN_SIGNATURE_UNKNOWN);
+	assert(in_txn() == txn);
 	if (txn->limbo_entry != NULL) {
 		assert(txn_has_flag(txn, TXN_WAIT_SYNC));
 		txn_limbo_abort(&txn_limbo, txn->limbo_entry);
@@ -787,6 +788,7 @@ txn_complete_success(struct txn *txn)
 	assert(!txn_has_flag(txn, TXN_IS_DONE));
 	assert(!txn_has_flag(txn, TXN_WAIT_SYNC));
 	assert(txn->signature >= 0);
+	assert(in_txn() == txn);
 	txn->status = TXN_COMMITTED;
 	if (txn->engine != NULL)
 		engine_commit(txn->engine, txn);
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 2d838413f6..463ba661ff 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -72,6 +72,22 @@ txn_limbo_is_ro(struct txn_limbo *limbo)
 	       (limbo->owner_id != instance_id || txn_limbo_is_frozen(limbo));
 }
 
+void
+txn_limbo_complete(struct txn *txn, bool is_success)
+{
+	/*
+	 * Some rollback/commit triggers require the in_txn fiber
+	 * variable to be set.
+	 */
+	assert(in_txn() == NULL);
+	fiber_set_txn(fiber(), txn);
+	if (is_success)
+		txn_complete_success(txn);
+	else
+		txn_complete_fail(txn);
+	fiber_set_txn(fiber(), NULL);
+}
+
 struct txn_limbo_entry *
 txn_limbo_last_synchro_entry(struct txn_limbo *limbo)
 {
@@ -287,7 +303,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 		e->txn->limbo_entry = NULL;
 		txn_limbo_abort(limbo, e);
 		txn_clear_flags(e->txn, TXN_WAIT_SYNC | TXN_WAIT_ACK);
-		txn_complete_fail(e->txn);
+		txn_limbo_complete(e->txn, false);
 		if (e == entry)
 			break;
 		fiber_wakeup(e->txn->fiber);
@@ -461,7 +477,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 		 * after the affected transactions.
 		 */
 		assert(e->txn->signature >= 0);
-		txn_complete_success(e->txn);
+		txn_limbo_complete(e->txn, true);
 	}
 	/*
 	 * Track CONFIRM lsn on replica in order to detect split-brain by
@@ -514,7 +530,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 		assert(e->txn->signature >= 0);
 		e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
 		e->txn->limbo_entry = NULL;
-		txn_complete_fail(e->txn);
+		txn_limbo_complete(e->txn, false);
 		if (e == last_rollback)
 			break;
 	}
diff --git a/test/replication-luatest/gh_8505_synchro_triggers_test.lua b/test/replication-luatest/gh_8505_synchro_triggers_test.lua
new file mode 100644
index 0000000000..205443ec62
--- /dev/null
+++ b/test/replication-luatest/gh_8505_synchro_triggers_test.lua
@@ -0,0 +1,80 @@
+local t = require('luatest')
+local replica_set = require('luatest.replica_set')
+local server = require('luatest.server')
+local proxy = require('luatest.replica_proxy')
+
+local g = t.group('gh-8505-synchro-triggers')
+
+g.before_all(function(g)
+    g.replica_set = replica_set:new({})
+    local rs_id = g.replica_set.id
+    g.box_cfg = {
+        replication_timeout = 0.01,
+        election_fencing_mode = 'off',
+        replication = {
+            server.build_listen_uri('server1', rs_id),
+            server.build_listen_uri('server2', rs_id),
+        },
+    }
+
+    g.box_cfg.election_mode = 'candidate'
+    g.server1 = g.replica_set:build_and_add_server{
+        alias = 'server1',
+        box_cfg = g.box_cfg,
+    }
+
+    g.proxy1 = proxy:new{
+        client_socket_path = server.build_listen_uri('server1_proxy'),
+        server_socket_path = server.build_listen_uri('server1', rs_id),
+    }
+    t.assert(g.proxy1:start{force = true}, 'Proxy from 2 to 1 started')
+    g.box_cfg.replication[1] = server.build_listen_uri('server1_proxy')
+    g.box_cfg.election_mode = 'voter'
+    g.server2 = g.replica_set:build_and_add_server{
+        alias = 'server2',
+        box_cfg = g.box_cfg,
+    }
+
+    g.replica_set:start()
+    g.server1:wait_for_election_leader()
+    g.server1:exec(function()
+        box.schema.create_space('test', {is_sync = true}):create_index('pk')
+    end)
+    g.server2:wait_for_vclock_of(g.server1)
+end)
+
+g.after_all(function(g)
+    g.replica_set:drop()
+end)
+
+g.test_on_commit_trigger = function(g)
+    g.server1:exec(function()
+        box.begin()
+        box.on_commit(function(iter) iter() end)
+        box.space.test:upsert({1}, {{'=', 1, 1}})
+        box.commit()
+    end)
+end
+
+g.test_on_rollback_trigger = function(g)
+    -- Force ACK gathering to fail and cause rollback. It's not enough
+    -- to set a small timeout, as a transaction can be committed anyway:
+    -- fibers don't yield so often, compared to such a tiny timeout, ACKs
+    -- can be processed before the transaction's rollback happens due to
+    -- a timeout error. So, let's break connection with proxy.
+    g.server1:update_box_cfg({ replication_synchro_timeout = 1e-9 })
+    g.server1:wait_for_election_leader()
+    g.proxy1:pause()
+
+    g.server1:exec(function()
+        box.begin()
+        box.on_rollback(function(iter) iter() end)
+        box.space.test:upsert({1}, {{'=', 1, 1}})
+        local _, err = pcall(box.commit)
+        t.assert_equals(err.code, box.error.SYNC_QUORUM_TIMEOUT)
+    end)
+
+    g.proxy1:resume()
+    g.server1:update_box_cfg({ replication_synchro_timeout = 5 })
+    g.server1:wait_for_election_leader()
+end
-- 
GitLab