From 4b511eebe0c2d170ba953afc6c78c36921c8de6e Mon Sep 17 00:00:00 2001
From: Aleksandr Lyapunov <alyapunov@tarantool.org>
Date: Thu, 17 Mar 2022 18:32:57 +0300
Subject: [PATCH] txm: better detect isolation level in transaction

When a transaction is started without specifying isolation level
(which is impossible now) the transactional manager must choose
the transaction level automatically, that means that is must
detemine whether the transaction can see other prepared changes
or not. The best effort that we can made is to check if current
transaction is read-only or not. For read-only transactions
there are hope and fear that it will remain read-only, and the
best choice is not to see prepared changes. But if the transaction
has DML statements - it must see prepared changed.

Note that a read-only transaction can became read-write if it make
a DML statement. But if a transaction ignores some other prepared
change and then makes a DML, there are no other options except
abort that transaction - it could not be serialized anymore.

Part of #6930
Closes #6246
NO_DOC=see later commits
---
 .../gh-6246-differen-behaviour-of-select.md   |   3 +
 src/box/memtx_tx.c                            |  99 +++++-
 src/box/memtx_tx.h                            |  13 +-
 ...46_mvcc_different_select_behavior_test.lua | 333 ++++++++++++++++++
 4 files changed, 437 insertions(+), 11 deletions(-)
 create mode 100644 changelogs/unreleased/gh-6246-differen-behaviour-of-select.md
 create mode 100644 test/box-luatest/gh_6246_mvcc_different_select_behavior_test.lua

diff --git a/changelogs/unreleased/gh-6246-differen-behaviour-of-select.md b/changelogs/unreleased/gh-6246-differen-behaviour-of-select.md
new file mode 100644
index 0000000000..042cc1be61
--- /dev/null
+++ b/changelogs/unreleased/gh-6246-differen-behaviour-of-select.md
@@ -0,0 +1,3 @@
+## bugfix/core
+
+* Select in RO transaction now reads confirmed data, like a standalone (auotcommit) select does (gh-6452).
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index bdb96188e6..c8a36fc4c3 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -345,21 +345,72 @@ memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim)
 	return 0;
 }
 
+/**
+ * Fix position of @a txn in global read view list to preserve the list to
+ * be ordered by rv_psn. Can only move txn to the beginning of the list.
+ * The function must be called when a transaction A sends itself to a read view
+ * (perhaps a deeper read view in case when it's already in a read view) because
+ * it has to skip a statement of another B, prepared transaction.
+ * The transaction is always added to the tail of read view list, but in this
+ * case there's no guarantee that psn of B is the greatest psn of all prepared
+ * transactions, so we have to additionally and push A in the global read view
+ * list, jumping over read views with greater rv_psn.
+ */
+static void
+memtx_tx_adjust_position_in_read_view_list(struct txn *txn)
+{
+	if (txn->in_read_view_txs.prev == &txm.read_view_txs)
+		return; /* No transaction before */
+	struct txn *prev_txn = rlist_prev_entry(txn, in_read_view_txs);
+	if (prev_txn->rv_psn <= txn->rv_psn)
+		return; /* The order is already correct. */
+	/* Remove from list for a while. */
+	rlist_del(&txn->in_read_view_txs);
+	while (prev_txn->in_read_view_txs.prev != &txm.read_view_txs) {
+		struct txn *scan = rlist_prev_entry(prev_txn, in_read_view_txs);
+		if (scan->rv_psn <= txn->rv_psn)
+			break;
+		prev_txn = scan;
+	}
+	/* Insert before prev_txn. */
+	rlist_add_tail(&prev_txn->in_read_view_txs, &txn->in_read_view_txs);
+}
+
 void
 memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
 {
 	assert(breaker->psn != 0);
-	if (victim->status != TXN_INPROGRESS) {
+	if (victim->status != TXN_INPROGRESS &&
+	    victim->status != TXN_IN_READ_VIEW) {
 		/* Was conflicted by somebody else. */
 		return;
 	}
 	if (stailq_empty(&victim->stmts)) {
-		/* Send to read view. */
-		victim->status = TXN_IN_READ_VIEW;
-		victim->rv_psn = breaker->psn;
-		rlist_add_tail(&txm.read_view_txs, &victim->in_read_view_txs);
+		assert((victim->status == TXN_IN_READ_VIEW) ==
+		       (victim->rv_psn != 0));
+		/* Send to read view, perhaps a deeper one. */
+		if (victim->status != TXN_IN_READ_VIEW) {
+			victim->status = TXN_IN_READ_VIEW;
+			victim->rv_psn = breaker->psn;
+			rlist_add_tail(&txm.read_view_txs,
+				       &victim->in_read_view_txs);
+		} else if (victim->rv_psn > breaker->psn) {
+			/*
+			 * Note that in every case for every key we may choose
+			 * any read view psn between confirmed level and the
+			 * oldest prepared transaction that changes that key.
+			 * But we choose the latest level because it generally
+			 * costs less, and if there are several breakers - we
+			 * must sequentially decrease read view level.
+			 */
+			victim->rv_psn = breaker->psn;
+			assert(victim->rv_psn != 0);
+		}
+		memtx_tx_adjust_position_in_read_view_list(victim);
 	} else {
 		/* Mark as conflicted. */
+		if (victim->status == TXN_IN_READ_VIEW)
+			rlist_del(&victim->in_read_view_txs);
 		victim->status = TXN_CONFLICTED;
 		txn_set_flags(victim, TXN_IS_CONFLICTED);
 	}
@@ -1910,10 +1961,27 @@ memtx_tx_tuple_clarify_impl(struct txn *txn, struct space *space,
 		if (memtx_tx_story_is_visible(story, txn, is_prepared_ok,
 					      &result, &own_change))
 			break;
+		if (story->add_psn != 0 && story->add_stmt != NULL &&
+		    txn != NULL) {
+			/*
+			 * If we skip prepared story then the transaction
+			 * must be before prepared in serialization order.
+			 * That can be a read view or conflict already.
+			 */
+			memtx_tx_handle_conflict(story->add_stmt->txn, txn);
+		}
 		if (story->link[index->dense_id].older_story == NULL)
 			break;
 		story = story->link[index->dense_id].older_story;
 	}
+	if (story->del_psn != 0 && story->del_stmt != NULL && txn != NULL) {
+		/*
+		 * If we see a tuple that is deleted by prepared transaction
+		 * then the transaction must be before prepared in serialization
+		 * order. That can be a read view or conflict already.
+		 */
+		memtx_tx_handle_conflict(story->del_stmt->txn, txn);
+	}
 	if (!own_change) {
 		/*
 		 * If the result tuple exists (is visible) - it is visible in
@@ -1932,6 +2000,23 @@ memtx_tx_tuple_clarify_impl(struct txn *txn, struct space *space,
 	return result;
 }
 
+/**
+ * Helper of @sa memtx_tx_tuple_clarify.
+ * Detect whether the transaction can see prepared, but unconfirmed commits.
+ */
+static bool
+detect_whether_prepared_ok(struct txn *txn)
+{
+	/*
+	 * The best effort that we can make is to determine whether the
+	 * transaction is read-only or not. For read only (including autocommit
+	 * select, that is txn == NULL) we should see only confirmed changes,
+	 * ignoring prepared. For read-write transaction we should see prepared
+	 * changes in order to avoid conflicts.
+	 */
+	return txn != NULL && !stailq_empty(&txn->stmts);
+}
+
 /**
  * Helper of @sa memtx_tx_tuple_clarify.
  * Detect is_prepared_ok flag and pass the job to memtx_tx_tuple_clarify_impl.
@@ -1941,7 +2026,7 @@ memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
 			    struct tuple *tuple, struct index *index,
 			    uint32_t mk_index)
 {
-	bool is_prepared_ok = txn != NULL;
+	bool is_prepared_ok = detect_whether_prepared_ok(txn);
 	return memtx_tx_tuple_clarify_impl(txn, space, tuple, index, mk_index,
 					   is_prepared_ok);
 }
@@ -1966,7 +2051,7 @@ memtx_tx_index_invisible_count_slow(struct txn *txn,
 		}
 
 		struct tuple *visible = NULL;
-		bool is_prepared_ok = txn != NULL;
+		bool is_prepared_ok = detect_whether_prepared_ok(txn);
 		memtx_tx_story_find_visible_tuple(story, txn, index->dense_id,
 						  is_prepared_ok, &visible);
 		if (visible == NULL)
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 93f6d8161c..a32116709f 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -212,11 +212,16 @@ int
 memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim);
 
 /**
- * Handle conflict when @a breaker transaction is prepared.
- * The conflict is happened if @a victim have read something that @a breaker
- * overwrites.
+ * Handle conflict when @a victim has read and @a breaker has written the same
+ * key, and @a breaker is prepared. The functions must be called in two cases:
+ * 1. @a breaker becomes prepared for every victim with non-empty intersection
+ * of victim read set / breaker write set.
+ * 2. @a victim has to read confirmed value and skips the value that prepared
+ * @a breaker wrote.
  * If @a victim is read-only or hasn't made any changes, it should be sent
- * to read view, in which is will not see @a breaker.
+ * to read view, in which is will not see @a breaker's changes. If @a victim
+ * is already in a read view - a read view that does not see every breaker
+ * changes is chosen.
  * Otherwise @a victim must be marked as conflicted and aborted on occasion.
  */
 void
diff --git a/test/box-luatest/gh_6246_mvcc_different_select_behavior_test.lua b/test/box-luatest/gh_6246_mvcc_different_select_behavior_test.lua
new file mode 100644
index 0000000000..68a48d36c7
--- /dev/null
+++ b/test/box-luatest/gh_6246_mvcc_different_select_behavior_test.lua
@@ -0,0 +1,333 @@
+local server = require('test.luatest_helpers.server')
+local t = require('luatest')
+
+local g = t.group()
+
+g.before_all = function()
+    g.server = server:new{
+        alias   = 'default',
+        box_cfg = {memtx_use_mvcc_engine = true}
+    }
+    g.server:start()
+end
+
+g.after_all = function()
+    g.server:drop()
+end
+
+g.before_each(function()
+    g.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('primary')
+    end)
+end)
+
+g.after_each(function()
+    g.server:exec(function() box.space.test:drop() end)
+end)
+
+-- Select in RO transaction is similar to autocommit.
+g.test_mvcc_different_select_behavior_simple = function()
+    g.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+
+        local f = fiber.create(function()
+            fiber.self():set_joinable(true)
+            s:insert{1}
+        end)
+
+        t.assert_equals(s:select(), {})
+        t.assert_equals(s:select(1), {})
+        t.assert_equals(s:count(), 0)
+        box.begin()
+        local res1 = s:select()
+        local res2 = s:select(1)
+        local res3 = s:count()
+        box.commit()
+        t.assert_equals(res1, {})
+        t.assert_equals(res2, {})
+        t.assert_equals(res3, 0)
+
+        f:join()
+
+        t.assert_equals(s:select(), {{1}})
+        t.assert_equals(s:select(1), {{1}})
+        t.assert_equals(s:count(), 1)
+        box.begin()
+        local res1 = s:select()
+        local res2 = s:select(1)
+        local res3 = s:count()
+        box.commit()
+        t.assert_equals(res1, {{1}})
+        t.assert_equals(res2, {{1}})
+        t.assert_equals(res3, 1)
+    end)
+end
+
+-- for RW transactions prepared statements are visible.
+g.test_mvcc_different_select_behavior_rw = function()
+    g.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+
+        local f = fiber.create(function()
+            fiber.self():set_joinable(true)
+            s:insert{1}
+        end)
+
+        box.begin()
+        s:replace{2}
+        local res1 = s:select()
+        local res2 = s:select(1)
+        local res3 = s:count()
+        box.commit()
+        t.assert_equals(res1, {{1}, {2}})
+        t.assert_equals(res2, {{1}})
+        t.assert_equals(res3, 2)
+
+        f:join()
+
+        t.assert_equals(s:select(), {{1}, {2}})
+        t.assert_equals(s:select(1), {{1}})
+        t.assert_equals(s:count(), 2)
+    end)
+end
+
+-- If RO transaction becomes RW - it can be aborted.
+g.test_mvcc_different_select_behavior_ro_rw = function()
+    g.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+
+        local f = fiber.create(function()
+            fiber.self():set_joinable(true)
+            s:insert{1}
+        end)
+
+        box.begin()
+        local res1 = s:select()
+        local res2 = s:select(1)
+        local res3 = s:count()
+        s:replace{2}
+        t.assert_error_msg_content_equals(
+            "Transaction has been aborted by conflict",
+            function() box.commit() end)
+        t.assert_equals(res1, {})
+        t.assert_equals(res2, {})
+        t.assert_equals(res3, 0)
+
+        f:join()
+
+        t.assert_equals(s:select(), {{1}})
+        t.assert_equals(s:select(1), {{1}})
+        t.assert_equals(s:count(), 1)
+    end)
+end
+
+        -- Similar conflict but with skipped delete.
+g.test_mvcc_different_select_behavior_ro_rw_delete = function()
+    g.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+
+        s:replace{1}
+
+        local f = fiber.create(function()
+            fiber.self():set_joinable(true)
+            s:delete{1}
+        end)
+
+        box.begin()
+        local res1 = s:select()
+        local res2 = s:select(1)
+        local res3 = s:count()
+        s:replace{2}
+        t.assert_error_msg_content_equals(
+            "Transaction has been aborted by conflict",
+            function() box.commit() end)
+
+        t.assert_equals(res1, {{1}})
+        t.assert_equals(res2, {{1}})
+        t.assert_equals(res3, 1)
+
+        f:join()
+
+        t.assert_equals(s:select(), {})
+        t.assert_equals(s:select(1), {})
+        t.assert_equals(s:count(), 0)
+    end)
+end
+
+-- Special check that order of read views is correct.
+g.test_mvcc_different_select_behavior_reordering = function()
+    g.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+        s:replace{0, 0}
+
+        local fibers = {}
+        local num_fibers = 10
+        local half = math.modf(num_fibers / 2)
+        local num_finished = 0
+        for i = 1, num_fibers do
+            local f = fiber.create(function()
+                fiber.self():set_joinable(true)
+                box.begin()
+                s:replace{i}
+                s:replace{0, i}
+                box.commit()
+                num_finished = num_finished + 1
+            end)
+            table.insert(fibers, f)
+        end
+
+        local num1,res1,num2,res2,res3
+        local cond = fiber.cond()
+        local reader = fiber.create(function()
+            fiber.self():set_joinable(true)
+            box.begin()
+            num1 = num_finished
+            res1 = s:select{half}
+            cond:wait()
+            num2 = num_finished
+            res2 = s:select{half}
+            res3 = s:select{0}
+            box.commit()
+        end)
+
+        box.begin()
+        t.assert_equals(s:select{num_fibers}, {}) -- highest level
+        t.assert_equals(s:select{1}, {}) -- lowest level
+
+        t.assert_equals(num_finished, 0)
+        for _,f in pairs(fibers) do
+           f:join()
+        end
+        t.assert_equals(num_finished, num_fibers)
+
+        -- Create transaction noise to force TX GC to do the job.
+        for i = num_fibers + 1, num_fibers * 3 do
+            fiber.create(function()
+                fiber.self():set_joinable(true)
+                s:replace{i}
+            end):join()
+        end
+
+        cond:signal()
+        reader:join()
+
+        t.assert_equals(s:select{num_fibers}, {}) -- highest level
+        t.assert_equals(s:select{1}, {}) -- lowest level
+        t.assert_equals(s:select{0}, {{0, 0}}) -- lowest level
+        box.commit()
+
+        -- Before RW transaction completed...
+        t.assert_equals(num1, 0)
+        -- ... we cannot see their changes.
+        t.assert_equals(res1, {})
+        -- After RW transaction completed...
+        t.assert_equals(num2, num_fibers)
+        -- ... we still cannot see their changes (repeatable read).
+        t.assert_equals(res2, {})
+        -- And now our read view show changes right tx number 'half'.
+        t.assert_equals(res3, {{0, half - 1}})
+    end)
+end
+
+-- Another check that order of read views is correct.
+g.test_mvcc_different_select_behavior_another_reordering = function()
+    g.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+
+        local fibers = {}
+        local num_fibers = 10
+        local num_finished = 0
+
+        s:replace{1, 0}
+        s:replace{num_fibers, 0}
+
+        local res11,num11,res12,num12
+        local cond1 = fiber.cond()
+        local reader1 = fiber.create(function()
+            fiber.self():set_joinable(true)
+            box.begin()
+            res11 = s:select{num_fibers}
+            num11 = num_finished
+            cond1:wait()
+            res12 = s:select{num_fibers}
+            num12 = num_finished
+            box.commit()
+        end)
+
+        for i = 1, num_fibers do
+            local f = fiber.create(function()
+                fiber.self():set_joinable(true)
+                box.begin()
+                s:replace{i, 1} -- reader1 was sent to read view.
+                box.commit()
+                num_finished = num_finished + 1
+            end)
+            table.insert(fibers, f)
+        end
+
+        local res21,num21,res22,num22
+        local cond2 = fiber.cond()
+        local reader2 = fiber.create(function()
+            fiber.self():set_joinable(true)
+            box.begin()
+            res21 = s:select{1} -- go to read view since we cannot see prepared.
+            num21 = num_finished
+            cond2:wait()
+            res22 = s:select{1}
+            num22 = num_finished
+            box.commit()
+        end)
+
+        for _,f in pairs(fibers) do
+           f:join()
+        end
+        t.assert_equals(num_finished, num_fibers)
+
+        -- Create transaction noise to force TX GC to do the job.
+        for i = num_fibers + 1, num_fibers * 3 do
+            fiber.create(function()
+                fiber.self():set_joinable(true)
+                s:replace{i}
+            end):join()
+        end
+
+        -- Finish TX in the first fiber.
+        cond1:signal()
+        reader1:join()
+
+        -- Create transaction noise to force TX GC to do the job.
+        for i = num_fibers + 1, num_fibers * 3 do
+            fiber.create(function()
+                fiber.self():set_joinable(true)
+                s:replace{i}
+            end):join()
+        end
+
+        -- Finish TX in the second fiber.
+        cond2:signal()
+        reader2:join()
+
+        t.assert_equals(res11, {{num_fibers, 0}})
+        t.assert_equals(num11, 0)
+        t.assert_equals(res12, {{num_fibers, 0}})
+        t.assert_equals(num12, num_finished)
+        t.assert_equals(res21, {{1, 0}})
+        t.assert_equals(num21, 0)
+        t.assert_equals(res22, {{1, 0}})
+        t.assert_equals(num22, num_finished)
+
+    end)
+end
-- 
GitLab