From 588170a77c9aba3d0cf898d0134286c6ffd46339 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov@tarantool.org>
Date: Wed, 12 Oct 2022 18:33:19 +0300
Subject: [PATCH] vinyl: implement transaction isolation levels

This commit adds support of transaction isolation levels introduced
earlier for memtx mvcc by commit ec750af66d949 ("txm: introduce
transaction isolation levels"). The isolation levels work exactly in
the same way as in memtx:

 - Unless a transaction explicitly specifies the 'read-committed'
   isolation level, it'll skip prepared statements, even if they are
   visible from its read view. The background for this was implemented
   in the previous patches, which added the is_prepared_ok flag to
   cache and mem iterators.

 - If a transaction skips a prepared statement, which would otherwise be
   visible from its read view, it's sent to the most recent read view
   preceding the prepared statement LSN. Note, older prepared statements
   are still visible from this read view and can actually be selected if
   committed later.

 - A transaction using the 'best-effort' isolation level (default) is
   switched to 'read-committed' when it executes the first write
   statement.

The implementation is tested by the existing memtx mvcc tests that were
made multi-engine in the scope of this commit. However, we add one more
test case - the one that checks that a 'best-effort' read view is
properly updated in case there is more than one prepared transaction.
Also, there are few tests that relied upon the old implementation and
assumed that select from Vinyl may return unconfirmed tuples. We update
those tests here as well.

Closes #5522

NO_DOC=already documented
---
 .../gh-5522-vy-tx-isolation-level.md          |  5 ++
 src/box/vinyl.c                               |  4 +-
 src/box/vy_point_lookup.c                     | 55 ++++++++----
 src/box/vy_read_iterator.c                    | 43 +++++++---
 src/box/vy_tx.c                               | 86 ++++++++++++-------
 src/box/vy_tx.h                               | 38 +++++++-
 test/box-luatest/suite.ini                    |  2 +-
 .../gh_6930_mvcc_isolation_levels_test.lua    | 69 +++++++++++----
 .../gh_6930_mvcc_net_box_iso_test.lua         | 49 ++++++-----
 .../gh-5167-qsync-rollback-snap.result        | 15 +++-
 .../gh-5167-qsync-rollback-snap.test.lua      |  9 +-
 test/replication/qsync_snapshots.result       | 15 +++-
 test/replication/qsync_snapshots.test.lua     |  9 +-
 test/sql/errinj.result                        |  9 ++
 test/sql/errinj.test.lua                      |  4 +
 test/vinyl/errinj.result                      |  6 ++
 test/vinyl/errinj.test.lua                    |  2 +
 test/vinyl/errinj_tx.result                   | 15 ++++
 test/vinyl/errinj_tx.test.lua                 |  7 ++
 .../gh-3395-read-prepared-uncommitted.result  | 52 ++++++++---
 ...gh-3395-read-prepared-uncommitted.test.lua | 40 ++++++---
 test/vinyl/upsert.result                      |  6 +-
 test/vinyl/upsert.test.lua                    |  6 +-
 23 files changed, 411 insertions(+), 135 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5522-vy-tx-isolation-level.md
 rename test/{box-luatest => engine-luatest}/gh_6930_mvcc_isolation_levels_test.lua (77%)
 rename test/{box-luatest => engine-luatest}/gh_6930_mvcc_net_box_iso_test.lua (80%)

diff --git a/changelogs/unreleased/gh-5522-vy-tx-isolation-level.md b/changelogs/unreleased/gh-5522-vy-tx-isolation-level.md
new file mode 100644
index 0000000000..df63925d31
--- /dev/null
+++ b/changelogs/unreleased/gh-5522-vy-tx-isolation-level.md
@@ -0,0 +1,5 @@
+## feature/vinyl
+
+* Added support of transaction isolation levels for the Vinyl engine.
+  The `txn_isolation` option passed to `box.begin()` now has the same
+  effect for Vinyl and memtx (gh-5522).
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 80821b70a8..cc2ae6b368 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2424,7 +2424,7 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
 {
 	struct vy_env *env = vy_env(engine);
 	assert(txn->engine_tx == NULL);
-	txn->engine_tx = vy_tx_begin(env->xm);
+	txn->engine_tx = vy_tx_begin(env->xm, txn->isolation);
 	if (txn->engine_tx == NULL)
 		return -1;
 	return 0;
@@ -3045,7 +3045,7 @@ vinyl_engine_prepare_join(struct engine *engine, void **arg)
 		return -1;
 	}
 	rlist_create(&ctx->entries);
-	ctx->rv = vy_tx_manager_read_view(env->xm);
+	ctx->rv = vy_tx_manager_read_view(env->xm, /*plsn=*/INT64_MAX);
 	if (ctx->rv == NULL) {
 		free(ctx);
 		return -1;
diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c
index 8d73328f25..c4e4b6cf68 100644
--- a/src/box/vy_point_lookup.c
+++ b/src/box/vy_point_lookup.c
@@ -74,12 +74,14 @@ vy_point_lookup_scan_txw(struct vy_lsm *lsm, struct vy_tx *tx,
  */
 static int
 vy_point_lookup_scan_cache(struct vy_lsm *lsm, const struct vy_read_view **rv,
-			   struct vy_entry key, struct vy_history *history)
+			   bool is_prepared_ok, struct vy_entry key,
+			   struct vy_history *history)
 {
 	lsm->cache.stat.lookup++;
 	struct vy_entry entry = vy_cache_get(&lsm->cache, key);
 
-	if (entry.stmt == NULL || vy_stmt_lsn(entry.stmt) > (*rv)->vlsn)
+	if (entry.stmt == NULL || vy_stmt_lsn(entry.stmt) > (*rv)->vlsn ||
+	    (!is_prepared_ok && vy_stmt_is_prepared(entry.stmt)))
 		return 0;
 
 	vy_stmt_counter_acct_tuple(&lsm->cache.stat.get, entry.stmt);
@@ -92,16 +94,18 @@ vy_point_lookup_scan_cache(struct vy_lsm *lsm, const struct vy_read_view **rv,
  */
 static int
 vy_point_lookup_scan_mem(struct vy_lsm *lsm, struct vy_mem *mem,
-			 const struct vy_read_view **rv,
-			 struct vy_entry key, struct vy_history *history)
+			 const struct vy_read_view **rv, bool is_prepared_ok,
+			 struct vy_entry key, struct vy_history *history,
+			 int64_t *min_skipped_plsn)
 {
 	struct vy_mem_iterator mem_itr;
 	vy_mem_iterator_open(&mem_itr, &lsm->stat.memory.iterator,
-			     mem, ITER_EQ, key, rv, /*is_prepared_ok=*/true);
+			     mem, ITER_EQ, key, rv, is_prepared_ok);
 	struct vy_history mem_history;
 	vy_history_create(&mem_history, &lsm->env->history_node_pool);
 	int rc = vy_mem_iterator_next(&mem_itr, &mem_history);
 	vy_history_splice(history, &mem_history);
+	*min_skipped_plsn = MIN(*min_skipped_plsn, mem_itr.min_skipped_plsn);
 	vy_mem_iterator_close(&mem_itr);
 	return rc;
 
@@ -112,17 +116,31 @@ vy_point_lookup_scan_mem(struct vy_lsm *lsm, struct vy_mem *mem,
  * Add found statements to the history list up to terminal statement.
  */
 static int
-vy_point_lookup_scan_mems(struct vy_lsm *lsm, const struct vy_read_view **rv,
+vy_point_lookup_scan_mems(struct vy_lsm *lsm, struct vy_tx *tx,
+			  const struct vy_read_view **rv, bool is_prepared_ok,
 			  struct vy_entry key, struct vy_history *history)
 {
 	assert(lsm->mem != NULL);
-	int rc = vy_point_lookup_scan_mem(lsm, lsm->mem, rv, key, history);
+	int64_t min_skipped_plsn = INT64_MAX;
+	if (vy_point_lookup_scan_mem(lsm, lsm->mem, rv, is_prepared_ok,
+				     key, history, &min_skipped_plsn) != 0)
+		return -1;
 	struct vy_mem *mem;
 	rlist_foreach_entry(mem, &lsm->sealed, in_sealed) {
-		if (rc != 0 || vy_history_is_terminal(history))
-			return rc;
-
-		rc = vy_point_lookup_scan_mem(lsm, mem, rv, key, history);
+		if (vy_history_is_terminal(history))
+			break;
+		if (vy_point_lookup_scan_mem(lsm, mem, rv, is_prepared_ok,
+					     key, history,
+					     &min_skipped_plsn) != 0)
+			return -1;
+	}
+	if (tx != NULL && min_skipped_plsn != INT64_MAX) {
+		if (vy_tx_send_to_read_view(tx, min_skipped_plsn) != 0)
+			return -1;
+		if (tx->state == VINYL_TX_ABORT) {
+			diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+			return -1;
+		}
 	}
 	return 0;
 }
@@ -214,12 +232,14 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx,
 	if (rc != 0 || vy_history_is_terminal(&history))
 		goto done;
 
-	rc = vy_point_lookup_scan_cache(lsm, rv, key, &history);
+	bool is_prepared_ok = tx != NULL ? vy_tx_is_prepared_ok(tx) : false;
+	rc = vy_point_lookup_scan_cache(lsm, rv, is_prepared_ok, key, &history);
 	if (rc != 0 || vy_history_is_terminal(&history))
 		goto done;
 
 restart:
-	rc = vy_point_lookup_scan_mems(lsm, rv, key, &mem_history);
+	rc = vy_point_lookup_scan_mems(lsm, tx, rv, is_prepared_ok,
+				       key, &mem_history);
 	if (rc != 0 || vy_history_is_terminal(&mem_history))
 		goto done;
 
@@ -271,7 +291,8 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx,
 		 * matching the search key.
 		 */
 		vy_history_cleanup(&mem_history);
-		rc = vy_point_lookup_scan_mems(lsm, rv, key, &mem_history);
+		rc = vy_point_lookup_scan_mems(lsm, tx, rv, is_prepared_ok,
+					       key, &mem_history);
 		if (rc != 0)
 			goto done;
 		if (vy_history_is_terminal(&mem_history))
@@ -306,11 +327,13 @@ vy_point_lookup_mem(struct vy_lsm *lsm, const struct vy_read_view **rv,
 	struct vy_history history;
 	vy_history_create(&history, &lsm->env->history_node_pool);
 
-	rc = vy_point_lookup_scan_cache(lsm, rv, key, &history);
+	rc = vy_point_lookup_scan_cache(lsm, rv, /*is_prepared_ok=*/true,
+					key, &history);
 	if (rc != 0 || vy_history_is_terminal(&history))
 		goto done;
 
-	rc = vy_point_lookup_scan_mems(lsm, rv, key, &history);
+	rc = vy_point_lookup_scan_mems(lsm, /*tx=*/NULL, rv,
+				       /*is_prepared_ok=*/true, key, &history);
 	if (rc != 0 || vy_history_is_terminal(&history))
 		goto done;
 
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index ca57d9ae53..7b362caf65 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -355,7 +355,8 @@ vy_read_iterator_scan_cache(struct vy_read_iterator *itr,
 
 static NODISCARD int
 vy_read_iterator_scan_mem(struct vy_read_iterator *itr, uint32_t mem_src,
-			  struct vy_entry *next, bool *stop)
+			  struct vy_entry *next, bool *stop,
+			  int64_t *min_skipped_plsn)
 {
 	int rc;
 	struct vy_read_src *src = &itr->src[mem_src];
@@ -375,8 +376,8 @@ vy_read_iterator_scan_mem(struct vy_read_iterator *itr, uint32_t mem_src,
 	}
 	if (rc < 0)
 		return -1;
-
 	vy_read_iterator_evaluate_src(itr, src, next, stop);
+	*min_skipped_plsn = MIN(*min_skipped_plsn, src_itr->min_skipped_plsn);
 	return 0;
 }
 
@@ -514,12 +515,22 @@ vy_read_iterator_advance(struct vy_read_iterator *itr)
 	if (stop)
 		goto done;
 
-	for (uint32_t i = itr->mem_src; i < itr->disk_src; i++) {
-		if (vy_read_iterator_scan_mem(itr, i, &next, &stop) != 0)
+	int64_t min_skipped_plsn = INT64_MAX;
+	for (uint32_t i = itr->mem_src; i < itr->disk_src && !stop; i++) {
+		if (vy_read_iterator_scan_mem(itr, i, &next, &stop,
+					      &min_skipped_plsn) != 0)
 			return -1;
-		if (stop)
-			goto done;
 	}
+	if (itr->tx != NULL && min_skipped_plsn != INT64_MAX) {
+		if (vy_tx_send_to_read_view(itr->tx, min_skipped_plsn) != 0)
+			return -1;
+		if (itr->tx->state == VINYL_TX_ABORT) {
+			diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+			return -1;
+		}
+	}
+	if (stop)
+		goto done;
 rescan_disk:
 	/* The following code may yield as it needs to access disk. */
 	vy_read_iterator_pin_slices(itr);
@@ -597,6 +608,7 @@ vy_read_iterator_advance(struct vy_read_iterator *itr)
 	return 0;
 }
 
+/** Add the transaction source to the read iterator. */
 static void
 vy_read_iterator_add_tx(struct vy_read_iterator *itr)
 {
@@ -609,19 +621,21 @@ vy_read_iterator_add_tx(struct vy_read_iterator *itr)
 			     iterator_type, itr->key);
 }
 
+/** Add the cache source to the read iterator. */
 static void
-vy_read_iterator_add_cache(struct vy_read_iterator *itr)
+vy_read_iterator_add_cache(struct vy_read_iterator *itr, bool is_prepared_ok)
 {
 	enum iterator_type iterator_type = (itr->iterator_type != ITER_REQ ?
 					    itr->iterator_type : ITER_LE);
 	struct vy_read_src *sub_src = vy_read_iterator_add_src(itr);
 	vy_cache_iterator_open(&sub_src->cache_iterator, &itr->lsm->cache,
 			       iterator_type, itr->key, itr->read_view,
-			       /*is_prepared_ok=*/true);
+			       is_prepared_ok);
 }
 
+/** Add the memory level source to the read iterator. */
 static void
-vy_read_iterator_add_mem(struct vy_read_iterator *itr)
+vy_read_iterator_add_mem(struct vy_read_iterator *itr, bool is_prepared_ok)
 {
 	enum iterator_type iterator_type = (itr->iterator_type != ITER_REQ ?
 					    itr->iterator_type : ITER_LE);
@@ -633,7 +647,7 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr)
 	sub_src = vy_read_iterator_add_src(itr);
 	vy_mem_iterator_open(&sub_src->mem_iterator, &lsm->stat.memory.iterator,
 			     lsm->mem, iterator_type, itr->key, itr->read_view,
-			     /*is_prepared_ok=*/true);
+			     is_prepared_ok);
 	/* Add sealed in-memory indexes. */
 	struct vy_mem *mem;
 	rlist_foreach_entry(mem, &lsm->sealed, in_sealed) {
@@ -641,10 +655,11 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr)
 		vy_mem_iterator_open(&sub_src->mem_iterator,
 				     &lsm->stat.memory.iterator,
 				     mem, iterator_type, itr->key,
-				     itr->read_view, /*is_prepared_ok=*/true);
+				     itr->read_view, is_prepared_ok);
 	}
 }
 
+/** Add the disk level source to the read iterator. */
 static void
 vy_read_iterator_add_disk(struct vy_read_iterator *itr)
 {
@@ -769,16 +784,18 @@ vy_read_iterator_restore(struct vy_read_iterator *itr)
 						    itr->last : itr->key);
 	itr->range_version = itr->curr_range->version;
 
+	bool is_prepared_ok = true;
 	if (itr->tx != NULL) {
+		is_prepared_ok = vy_tx_is_prepared_ok(itr->tx);
 		itr->txw_src = itr->src_count;
 		vy_read_iterator_add_tx(itr);
 	}
 
 	itr->cache_src = itr->src_count;
-	vy_read_iterator_add_cache(itr);
+	vy_read_iterator_add_cache(itr, is_prepared_ok);
 
 	itr->mem_src = itr->src_count;
-	vy_read_iterator_add_mem(itr);
+	vy_read_iterator_add_mem(itr, is_prepared_ok);
 
 	itr->disk_src = itr->src_count;
 	vy_read_iterator_add_disk(itr);
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 10fb115993..7f2af76888 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -158,42 +158,55 @@ vy_tx_manager_mem_used(struct vy_tx_manager *xm)
 }
 
 struct vy_read_view *
-vy_tx_manager_read_view(struct vy_tx_manager *xm)
+vy_tx_manager_read_view(struct vy_tx_manager *xm, int64_t plsn)
 {
+	assert(plsn >= MAX_LSN);
+	/* Look up the last read view with lsn less than the given one. */
 	struct vy_read_view *rv;
+	rlist_foreach_entry_reverse(rv, &xm->read_views, in_read_views) {
+		if (plsn > rv->vlsn)
+			break;
+	}
+	bool rv_exists = !rlist_entry_is_head(rv, &xm->read_views,
+					      in_read_views);
+	/* Look up the last prepared tx with lsn less than the given one. */
+	struct vy_tx *tx;
+	rlist_foreach_entry_reverse(tx, &xm->prepared, in_prepared) {
+		if (plsn > MAX_LSN + tx->psn)
+			break;
+	}
+	bool tx_exists = !rlist_entry_is_head(tx, &xm->prepared, in_prepared);
 	/*
 	 * Check if the last read view can be reused. Reference
 	 * and return it if it's the case.
 	 */
-	struct vy_tx *last_prepared_tx = rlist_empty(&xm->prepared) ? NULL :
-		rlist_last_entry(&xm->prepared, struct vy_tx, in_prepared);
-	if (!rlist_empty(&xm->read_views)) {
-		rv = rlist_last_entry(&xm->read_views, struct vy_read_view,
-				      in_read_views);
-		/** Reuse an existing read view */
-		if ((last_prepared_tx == NULL && rv->vlsn == xm->lsn) ||
-		    (last_prepared_tx != NULL &&
-		     rv->vlsn == MAX_LSN + last_prepared_tx->psn)) {
-
+	if (rv_exists) {
+		if ((!tx_exists && rv->vlsn == xm->lsn) ||
+		    (tx_exists && rv->vlsn == MAX_LSN + tx->psn)) {
 			rv->refs++;
-			return  rv;
+			return rv;
 		}
 	}
+	/*
+	 * Allocate a new read view and insert it into the read view list
+	 * preserving the order.
+	 */
+	struct vy_read_view *prev_rv = rv;
 	rv = mempool_alloc(&xm->read_view_mempool);
 	if (rv == NULL) {
 		diag_set(OutOfMemory, sizeof(*rv),
 			 "mempool", "read view");
 		return NULL;
 	}
-	if (last_prepared_tx != NULL) {
-		rv->vlsn = MAX_LSN + last_prepared_tx->psn;
-		last_prepared_tx->read_view = rv;
+	if (tx_exists) {
+		rv->vlsn = MAX_LSN + tx->psn;
+		tx->read_view = rv;
 		rv->refs = 2;
 	} else {
 		rv->vlsn = xm->lsn;
 		rv->refs = 1;
 	}
-	rlist_add_tail_entry(&xm->read_views, rv, in_read_views);
+	rlist_add_entry(&prev_rv->in_read_views, rv, in_read_views);
 	return rv;
 }
 
@@ -328,6 +341,7 @@ vy_tx_create(struct vy_tx_manager *xm, struct vy_tx *tx)
 	tx->write_set_version = 0;
 	tx->write_size = 0;
 	tx->xm = xm;
+	tx->isolation = TXN_ISOLATION_READ_CONFIRMED;
 	tx->state = VINYL_TX_READY;
 	tx->is_applier_session = false;
 	tx->read_view = (struct vy_read_view *)xm->p_global_read_view;
@@ -379,12 +393,32 @@ vy_tx_is_in_read_view(struct vy_tx *tx)
 	return tx->read_view->vlsn != INT64_MAX;
 }
 
+int
+vy_tx_send_to_read_view(struct vy_tx *tx, int64_t plsn)
+{
+	assert(plsn >= MAX_LSN);
+	assert(tx->state == VINYL_TX_READY);
+	if (tx->read_view->vlsn < plsn)
+		return 0;
+	if (!vy_tx_is_ro(tx)) {
+		vy_tx_abort(tx);
+		return 0;
+	}
+	struct vy_tx_manager *xm = tx->xm;
+	struct vy_read_view *rv = vy_tx_manager_read_view(xm, plsn);
+	if (rv == NULL)
+		return -1;
+	vy_tx_manager_destroy_read_view(xm, tx->read_view);
+	tx->read_view = rv;
+	return 0;
+}
+
 /**
  * Send to read view all transactions that are reading key @v
  * modified by transaction @tx and abort all transactions that are modifying it.
  */
 static int
-vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v)
+vy_tx_send_readers_to_read_view(struct vy_tx *tx, struct txv *v)
 {
 	struct vy_tx_conflict_iterator it;
 	vy_tx_conflict_iterator_init(&it, &v->lsm->read_set, v->entry);
@@ -396,17 +430,8 @@ vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v)
 		/* Abort only active TXs */
 		if (abort->state != VINYL_TX_READY)
 			continue;
-		/* already in (earlier) read view */
-		if (vy_tx_is_in_read_view(abort))
-			continue;
-		if (!vy_tx_is_ro(abort)) {
-			vy_tx_abort(abort);
-			continue;
-		}
-		struct vy_read_view *rv = vy_tx_manager_read_view(tx->xm);
-		if (rv == NULL)
+		if (vy_tx_send_to_read_view(abort, INT64_MAX) != 0)
 			return -1;
-		abort->read_view = rv;
 	}
 	return 0;
 }
@@ -433,8 +458,10 @@ vy_tx_abort_readers(struct vy_tx *tx, struct txv *v)
 }
 
 struct vy_tx *
-vy_tx_begin(struct vy_tx_manager *xm)
+vy_tx_begin(struct vy_tx_manager *xm, enum txn_isolation_level isolation)
 {
+	assert(isolation < txn_isolation_level_MAX &&
+	       isolation != TXN_ISOLATION_DEFAULT);
 	struct vy_tx *tx = mempool_alloc(&xm->tx_mempool);
 	if (unlikely(tx == NULL)) {
 		diag_set(OutOfMemory, sizeof(*tx), "mempool", "struct vy_tx");
@@ -446,6 +473,7 @@ vy_tx_begin(struct vy_tx_manager *xm)
 	if (session != NULL && session->type == SESSION_TYPE_APPLIER)
 		tx->is_applier_session = true;
 
+	tx->isolation = isolation;
 	return tx;
 }
 
@@ -678,7 +706,7 @@ vy_tx_prepare(struct vy_tx *tx)
 	struct write_set_iterator it;
 	write_set_ifirst(&tx->write_set, &it);
 	while ((v = write_set_inext(&it)) != NULL) {
-		if (vy_tx_send_to_read_view(tx, v))
+		if (vy_tx_send_readers_to_read_view(tx, v))
 			return -1;
 	}
 
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index e44bc9cbf6..83ba75750a 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -42,6 +42,7 @@
 #include "iterator_type.h"
 #include "salad/stailq.h"
 #include "trivia/util.h"
+#include "txn.h"
 #include "vy_entry.h"
 #include "vy_lsm.h"
 #include "vy_stat.h"
@@ -174,6 +175,8 @@ struct vy_tx {
 	 * the write set.
 	 */
 	size_t write_size;
+	/** Transaction isolation level. */
+	enum txn_isolation_level isolation;
 	/** Current state of the transaction.*/
 	enum tx_state state;
 	/** Set if the transaction was started by an applier. */
@@ -210,6 +213,23 @@ vy_tx_read_view(struct vy_tx *tx)
 	return (const struct vy_read_view **)&tx->read_view;
 }
 
+/** Return true if the transaction may see prepared statements. */
+static inline bool
+vy_tx_is_prepared_ok(struct vy_tx *tx)
+{
+	switch (tx->isolation) {
+	case TXN_ISOLATION_READ_COMMITTED:
+		return true;
+	case TXN_ISOLATION_READ_CONFIRMED:
+	case TXN_ISOLATION_LINEARIZABLE:
+		return false;
+	case TXN_ISOLATION_BEST_EFFORT:
+		return !rlist_empty(&tx->in_writers);
+	default:
+		unreachable();
+	}
+}
+
 /** Transaction manager object. */
 struct vy_tx_manager {
 	/**
@@ -291,9 +311,12 @@ vy_tx_manager_delete(struct vy_tx_manager *xm);
 size_t
 vy_tx_manager_mem_used(struct vy_tx_manager *xm);
 
-/** Create or reuse an instance of a read view. */
+/**
+ * Create or reuse an instance of a read view whose vlsn is less than the given
+ * prepared statement LSN. Returns NULL on memory allocation error.
+ */
 struct vy_read_view *
-vy_tx_manager_read_view(struct vy_tx_manager *xm);
+vy_tx_manager_read_view(struct vy_tx_manager *xm, int64_t plsn);
 
 /** Dereference and possibly destroy a read view. */
 void
@@ -330,7 +353,7 @@ vy_tx_destroy(struct vy_tx *tx);
 
 /** Begin a new transaction. */
 struct vy_tx *
-vy_tx_begin(struct vy_tx_manager *xm);
+vy_tx_begin(struct vy_tx_manager *xm, enum txn_isolation_level isolation);
 
 /** Prepare a transaction to be committed. */
 int
@@ -423,6 +446,15 @@ vy_tx_track_point(struct vy_tx *tx, struct vy_lsm *lsm, struct vy_entry entry);
 int
 vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt);
 
+/**
+ * Send an active transaction to a read view such that its vlsn is less than
+ * the given prepared statement LSN. Returns 0 on success, -1 on memory
+ * allocation error. The transaction is aborted immediately if it has any
+ * write statements.
+ */
+int
+vy_tx_send_to_read_view(struct vy_tx *tx, int64_t plsn);
+
 /**
  * Iterator over the write set of a transaction.
  */
diff --git a/test/box-luatest/suite.ini b/test/box-luatest/suite.ini
index dbea6eca1c..31a2c4b832 100644
--- a/test/box-luatest/suite.ini
+++ b/test/box-luatest/suite.ini
@@ -2,5 +2,5 @@
 core = luatest
 description = Database tests
 is_parallel = True
-release_disabled = gh_6819_iproto_watch_not_implemented_test.lua gh_6930_mvcc_net_box_iso_test.lua
+release_disabled = gh_6819_iproto_watch_not_implemented_test.lua
 long_run = gh_7605_qsort_recovery_test.lua gh_7670_memtx_tx_manager_idx_rand_inconsistency_test.lua
diff --git a/test/box-luatest/gh_6930_mvcc_isolation_levels_test.lua b/test/engine-luatest/gh_6930_mvcc_isolation_levels_test.lua
similarity index 77%
rename from test/box-luatest/gh_6930_mvcc_isolation_levels_test.lua
rename to test/engine-luatest/gh_6930_mvcc_isolation_levels_test.lua
index c2fdd45b8a..ce09be0a37 100644
--- a/test/box-luatest/gh_6930_mvcc_isolation_levels_test.lua
+++ b/test/engine-luatest/gh_6930_mvcc_isolation_levels_test.lua
@@ -1,22 +1,22 @@
 local server = require('test.luatest_helpers.server')
 local t = require('luatest')
 
-local g = t.group()
+local g = t.group(nil, {{engine = 'memtx'}, {engine = 'vinyl'}})
 
-g.before_all = function()
-    g.server = server:new{
+g.before_all(function(cg)
+    cg.server = server:new{
         alias   = 'default',
         box_cfg = {memtx_use_mvcc_engine = true}
     }
-    g.server:start()
-end
+    cg.server:start()
+end)
 
-g.after_all = function()
-    g.server:drop()
-end
+g.after_all(function(cg)
+    cg.server:drop()
+end)
 
-g.test_mvcc_isolation_level_errors = function()
-    g.server:exec(function()
+g.test_mvcc_isolation_level_errors = function(cg)
+    cg.server:exec(function()
         local t = require('luatest')
         t.assert_error_msg_content_equals(
             "Illegal parameters, txn_isolation must be one of " ..
@@ -49,15 +49,15 @@ g.test_mvcc_isolation_level_errors = function()
     end)
 end
 
-g.before_test('test_mvcc_isolation_level_basics', function()
-    g.server:exec(function()
-        local s = box.schema.space.create('test')
+g.before_test('test_mvcc_isolation_level_basics', function(cg)
+    cg.server:exec(function(engine)
+        local s = box.schema.space.create('test', {engine = engine})
         s:create_index('primary')
-    end)
+    end, {cg.params.engine})
 end)
 
-g.test_mvcc_isolation_level_basics = function()
-    g.server:exec(function()
+g.test_mvcc_isolation_level_basics = function(cg)
+    cg.server:exec(function()
         local t = require('luatest')
         local fiber = require('fiber')
         local s = box.space.test
@@ -168,8 +168,41 @@ g.test_mvcc_isolation_level_basics = function()
     end)
 end
 
-g.after_test('test_mvcc_isolation_level_basics', function()
-    g.server:exec(function()
+g.after_test('test_mvcc_isolation_level_basics', function(cg)
+    cg.server:exec(function()
+        local s = box.space.test
+        if s then
+            s:drop()
+        end
+    end)
+end)
+
+g.before_test('test_mvcc_best_effort_read_view', function(cg)
+    cg.server:exec(function(engine)
+        local s = box.schema.space.create('test', {engine = engine})
+        s:create_index('primary')
+    end, {cg.params.engine})
+end)
+
+g.test_mvcc_best_effort_read_view = function(cg)
+    cg.server:exec(function()
+        local t = require('luatest')
+        local fiber = require('fiber')
+        local s = box.space.test
+        local ch = fiber.channel(1)
+        fiber.create(function() s:replace{1} end)
+        fiber.create(function() s:replace{2} ch:put(true) end)
+        box.begin({txn_isolation = 'best-effort'})
+        t.assert_equals(s:select(2), {})
+        t.assert(ch:get(5))
+        t.assert_equals(s:select(1), {{1}})
+        box.commit()
+    end)
+end
+
+g.after_test('test_mvcc_best_effort_read_view', function(cg)
+    cg.server:exec(function()
+        box.error.injection.set('ERRINJ_WAL_DELAY', false)
         local s = box.space.test
         if s then
             s:drop()
diff --git a/test/box-luatest/gh_6930_mvcc_net_box_iso_test.lua b/test/engine-luatest/gh_6930_mvcc_net_box_iso_test.lua
similarity index 80%
rename from test/box-luatest/gh_6930_mvcc_net_box_iso_test.lua
rename to test/engine-luatest/gh_6930_mvcc_net_box_iso_test.lua
index d17255d8db..a833c150d5 100644
--- a/test/box-luatest/gh_6930_mvcc_net_box_iso_test.lua
+++ b/test/engine-luatest/gh_6930_mvcc_net_box_iso_test.lua
@@ -1,32 +1,35 @@
+local misc = require('test.luatest_helpers.misc')
 local server = require('test.luatest_helpers.server')
 local t = require('luatest')
 
-local g = t.group()
+local g = t.group(nil, {{engine = 'memtx'}, {engine = 'vinyl'}})
 
-g.before_all = function()
-    g.server = server:new{
+g.before_all(function(cg)
+    cg.server = server:new{
         alias   = 'default',
         box_cfg = {memtx_use_mvcc_engine = true}
     }
-    g.server:start()
-end
+    cg.server:start()
+end)
 
-g.after_all = function()
-    g.server:drop()
-end
+g.after_all(function(cg)
+    cg.server:drop()
+end)
 
-g.before_test('test_mvcc_netbox_isolation_level_basics', function()
-    g.server:exec(function()
-        local s = box.schema.space.create('test')
+g.before_test('test_mvcc_netbox_isolation_level_basics', function(cg)
+    cg.server:exec(function(engine)
+        local s = box.schema.space.create('test', {engine = engine})
         s:create_index('primary')
         box.schema.user.grant('guest', 'read,write', 'space', 'test')
-    end)
+    end, {cg.params.engine})
 end)
 
-g.test_mvcc_netbox_isolation_level_basics = function()
+g.test_mvcc_netbox_isolation_level_basics = function(cg)
+    misc.skip_if_not_debug()
+
     local t = require('luatest')
 
-    g.server:exec(function()
+    cg.server:exec(function()
         local s = box.space.test
         box.error.injection.set('ERRINJ_WAL_DELAY', true)
         local fiber = require('fiber')
@@ -38,7 +41,7 @@ g.test_mvcc_netbox_isolation_level_basics = function()
     end)
 
     local netbox = require('net.box')
-    local conn = netbox.connect(g.server.net_box_uri)
+    local conn = netbox.connect(cg.server.net_box_uri)
 
     t.assert_equals(conn.space.test:select(), {})
     local strm = conn:new_stream()
@@ -59,14 +62,14 @@ g.test_mvcc_netbox_isolation_level_basics = function()
     end
 
     for _,level in pairs(expect0) do
-        g.server:exec(function(cfg_level)
+        cg.server:exec(function(cfg_level)
             box.cfg{txn_isolation = cfg_level}
         end, {level})
         strm:begin()
         t.assert_equals(strm.space.test:select(), {})
         strm:commit()
     end
-    g.server:exec(function()
+    cg.server:exec(function()
         box.cfg{txn_isolation = 'best-effort'}
     end)
 
@@ -81,7 +84,7 @@ g.test_mvcc_netbox_isolation_level_basics = function()
     end
 
     for _,level in pairs(expect1) do
-        g.server:exec(function(cfg_level)
+        cg.server:exec(function(cfg_level)
             box.cfg{txn_isolation = cfg_level}
         end, {level})
         strm:begin()
@@ -91,7 +94,7 @@ g.test_mvcc_netbox_isolation_level_basics = function()
         -- which is always run as read-confirmed
         t.assert_equals(strm.space.test:select(), {})
     end
-    g.server:exec(function()
+    cg.server:exec(function()
         box.cfg{txn_isolation = 'best-effort'}
     end)
 
@@ -109,20 +112,20 @@ g.test_mvcc_netbox_isolation_level_basics = function()
     strm:begin{txn_isolation = 'read-committed'}
     t.assert_equals(strm.space.test:select{1}, {{1}})
     strm.space.test:replace{2}
-    g.server:exec(function()
+    cg.server:exec(function()
         box.error.injection.set('ERRINJ_WAL_DELAY', false)
     end)
     strm:commit()
 
     t.assert_equals(strm.space.test:select{}, {{1}, {2}})
 
-    g.server:exec(function()
+    cg.server:exec(function()
         rawget(_G, 'f'):join()
     end)
 end
 
-g.after_test('test_mvcc_netbox_isolation_level_basics', function()
-    g.server:exec(function()
+g.after_test('test_mvcc_netbox_isolation_level_basics', function(cg)
+    cg.server:exec(function()
         box.error.injection.set('ERRINJ_WAL_DELAY', false)
         local s = box.space.test
         if s then
diff --git a/test/replication/gh-5167-qsync-rollback-snap.result b/test/replication/gh-5167-qsync-rollback-snap.result
index 13166720f6..85ef58612e 100644
--- a/test/replication/gh-5167-qsync-rollback-snap.result
+++ b/test/replication/gh-5167-qsync-rollback-snap.result
@@ -71,7 +71,20 @@ test_run:switch('replica')
 fiber = require('fiber')
  | ---
  | ...
-test_run:wait_cond(function() return box.space.sync:count() == 1 end)
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function()
+    box.begin({txn_isolation = 'read-committed'})
+    local ret = box.space.sync:count()
+    box.commit()
+    return ret == 1
+end);
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
  | ---
  | - true
  | ...
diff --git a/test/replication/gh-5167-qsync-rollback-snap.test.lua b/test/replication/gh-5167-qsync-rollback-snap.test.lua
index 1a2a31b7c7..5e8950eb91 100644
--- a/test/replication/gh-5167-qsync-rollback-snap.test.lua
+++ b/test/replication/gh-5167-qsync-rollback-snap.test.lua
@@ -29,7 +29,14 @@ end)
 
 test_run:switch('replica')
 fiber = require('fiber')
-test_run:wait_cond(function() return box.space.sync:count() == 1 end)
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function()
+    box.begin({txn_isolation = 'read-committed'})
+    local ret = box.space.sync:count()
+    box.commit()
+    return ret == 1
+end);
+test_run:cmd("setopt delimiter ''");
 -- Snapshot will stuck in WAL thread on rotation before starting wait on the
 -- limbo.
 box.error.injection.set("ERRINJ_WAL_DELAY", true)
diff --git a/test/replication/qsync_snapshots.result b/test/replication/qsync_snapshots.result
index ca418b168d..742a56fce0 100644
--- a/test/replication/qsync_snapshots.result
+++ b/test/replication/qsync_snapshots.result
@@ -213,7 +213,20 @@ fiber = require('fiber')
 box.cfg{replication_synchro_timeout=1000}
  | ---
  | ...
-test_run:wait_cond(function() return box.space.sync:count() == 1 end)
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function()
+    box.begin({txn_isolation = 'read-committed'})
+    local ret = box.space.sync:count()
+    box.commit()
+    return ret == 1
+end);
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
  | ---
  | - true
  | ...
diff --git a/test/replication/qsync_snapshots.test.lua b/test/replication/qsync_snapshots.test.lua
index 82c2e3f7c5..5208c26d4e 100644
--- a/test/replication/qsync_snapshots.test.lua
+++ b/test/replication/qsync_snapshots.test.lua
@@ -100,7 +100,14 @@ end)
 test_run:switch('replica')
 fiber = require('fiber')
 box.cfg{replication_synchro_timeout=1000}
-test_run:wait_cond(function() return box.space.sync:count() == 1 end)
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function()
+    box.begin({txn_isolation = 'read-committed'})
+    local ret = box.space.sync:count()
+    box.commit()
+    return ret == 1
+end);
+test_run:cmd("setopt delimiter ''");
 ok, err = nil
 f = fiber.create(function() ok, err = pcall(box.snapshot) end)
 
diff --git a/test/sql/errinj.result b/test/sql/errinj.result
index f19203bc62..a57a9008c7 100644
--- a/test/sql/errinj.result
+++ b/test/sql/errinj.result
@@ -137,6 +137,12 @@ box.execute('drop table test')
 -- policy, SQL responses could be corrupted, when DDL/DML is mixed
 -- with DQL. Same as gh-3255.
 --
+txn_isolation_default = box.cfg.txn_isolation
+---
+...
+box.cfg{txn_isolation = 'read-committed'}
+---
+...
 box.execute('CREATE TABLE test (id integer primary key)')
 ---
 - row_count: 1
@@ -171,6 +177,9 @@ box.execute('DROP TABLE test')
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
+box.cfg{txn_isolation = txn_isolation_default}
+---
+...
 ----
 ---- gh-3273: Move SQL TRIGGERs into server.
 ----
diff --git a/test/sql/errinj.test.lua b/test/sql/errinj.test.lua
index 72e96a9255..5436e3b3a1 100644
--- a/test/sql/errinj.test.lua
+++ b/test/sql/errinj.test.lua
@@ -48,6 +48,9 @@ box.execute('drop table test')
 -- policy, SQL responses could be corrupted, when DDL/DML is mixed
 -- with DQL. Same as gh-3255.
 --
+txn_isolation_default = box.cfg.txn_isolation
+box.cfg{txn_isolation = 'read-committed'}
+
 box.execute('CREATE TABLE test (id integer primary key)')
 cn = remote.connect(box.cfg.listen)
 
@@ -60,6 +63,7 @@ errinj.set("ERRINJ_IPROTO_TX_DELAY", false)
 
 box.execute('DROP TABLE test')
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+box.cfg{txn_isolation = txn_isolation_default}
 
 ----
 ---- gh-3273: Move SQL TRIGGERs into server.
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index a13006939b..18d10b077b 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -766,6 +766,9 @@ wait_replace = true
 _ = fiber.create(function() s:replace{1, 1} wait_replace = false end)
 ---
 ...
+box.begin({txn_isolation = 'read-committed'})
+---
+...
 gen,param,state = s:pairs({1}, {iterator = 'GE'})
 ---
 ...
@@ -790,6 +793,9 @@ value
 ---
 - [2, 0]
 ...
+box.commit()
+---
+...
 s:drop()
 ---
 ...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 4538d794b8..d698b44084 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -274,6 +274,7 @@ s:select{0}
 errinj.set("ERRINJ_WAL_DELAY", true)
 wait_replace = true
 _ = fiber.create(function() s:replace{1, 1} wait_replace = false end)
+box.begin({txn_isolation = 'read-committed'})
 gen,param,state = s:pairs({1}, {iterator = 'GE'})
 state, value = gen(param, state)
 value
@@ -281,6 +282,7 @@ errinj.set("ERRINJ_WAL_DELAY", false)
 while wait_replace do fiber.sleep(0.01) end
 state, value = gen(param, state)
 value
+box.commit()
 s:drop()
 
 --
diff --git a/test/vinyl/errinj_tx.result b/test/vinyl/errinj_tx.result
index 7d59d15668..101193a478 100644
--- a/test/vinyl/errinj_tx.result
+++ b/test/vinyl/errinj_tx.result
@@ -13,6 +13,12 @@ create_iterator = require('utils').create_iterator
 errinj = box.error.injection
 ---
 ...
+txn_isolation_default = box.cfg.txn_isolation
+---
+...
+box.cfg{txn_isolation = 'read-committed'}
+---
+...
 --
 -- gh-1681: vinyl: crash in vy_rollback on ER_WAL_WRITE
 --
@@ -460,6 +466,9 @@ _ = fiber.create(function() pcall(s.replace, s, {1}) ch:put(true) end)
 ---
 ...
 -- Read the tuple from another transaction.
+box.begin({txn_isolation = 'read-committed'})
+---
+...
 itr = create_iterator(s)
 ---
 ...
@@ -519,6 +528,9 @@ c:commit()
 itr = nil
 ---
 ...
+box.rollback()
+---
+...
 s:drop()
 ---
 ...
@@ -532,3 +544,6 @@ box.stat.vinyl().tx.read_views -- 0
 ---
 - 0
 ...
+box.cfg{txn_isolation = txn_isolation_default}
+---
+...
diff --git a/test/vinyl/errinj_tx.test.lua b/test/vinyl/errinj_tx.test.lua
index c434d92ff3..24547b7e6b 100644
--- a/test/vinyl/errinj_tx.test.lua
+++ b/test/vinyl/errinj_tx.test.lua
@@ -4,6 +4,9 @@ txn_proxy = require('txn_proxy')
 create_iterator = require('utils').create_iterator
 errinj = box.error.injection
 
+txn_isolation_default = box.cfg.txn_isolation
+box.cfg{txn_isolation = 'read-committed'}
+
 --
 -- gh-1681: vinyl: crash in vy_rollback on ER_WAL_WRITE
 --
@@ -208,6 +211,7 @@ errinj.set('ERRINJ_WAL_DELAY', true)
 _ = fiber.create(function() pcall(s.replace, s, {1}) ch:put(true) end)
 
 -- Read the tuple from another transaction.
+box.begin({txn_isolation = 'read-committed'})
 itr = create_iterator(s)
 itr.next()
 
@@ -229,9 +233,12 @@ c('s:replace{1}')
 c:commit()
 
 itr = nil
+box.rollback()
 s:drop()
 
 -- Collect all iterators to make sure no read views are left behind,
 -- as they might disrupt the following test run.
 collectgarbage()
 box.stat.vinyl().tx.read_views -- 0
+
+box.cfg{txn_isolation = txn_isolation_default}
diff --git a/test/vinyl/gh-3395-read-prepared-uncommitted.result b/test/vinyl/gh-3395-read-prepared-uncommitted.result
index 7dd22a1a9b..17ef990aba 100644
--- a/test/vinyl/gh-3395-read-prepared-uncommitted.result
+++ b/test/vinyl/gh-3395-read-prepared-uncommitted.result
@@ -41,17 +41,29 @@ box.snapshot()
  | - ok
  | ...
 
-c = fiber.channel(1)
+c1 = fiber.channel(1)
+ | ---
+ | ...
+c2 = fiber.channel(1)
  | ---
  | ...
 
 function do_write() s:replace{1, 2} end
  | ---
  | ...
-function init_read() end
+
+test_run:cmd("setopt delimiter ';'")
  | ---
+ | - true
  | ...
-function do_read() local ret = sk:select{2} c:put(ret) end
+function do_read()
+    c1:get()
+    c2:put(true)
+    box.begin({txn_isolation = 'read-committed'})
+    local ret = sk:select{2}
+    box.commit()
+    c2:put(ret)
+end;
  | ---
  | ...
 
@@ -68,10 +80,6 @@ function do_read() local ret = sk:select{2} c:put(ret) end
 -- mem doesn't change and the statement is returned in the result set
 -- (i.e. dirty read takes place).
 --
-test_run:cmd("setopt delimiter ';'");
- | ---
- | - true
- | ...
 -- is_tx_faster_than_wal determines whether wal thread has time
 -- to finish its routine or not. In the first case we add extra
 -- time gap to make sure that  WAL thread finished work and
@@ -79,10 +87,11 @@ test_run:cmd("setopt delimiter ';'");
 --
 function read_prepared_with_delay(is_tx_faster_than_wal)
     errinj.set("ERRINJ_WAL_DELAY", true)
-    fiber.create(do_write, s)
-    init_read()
+    fiber.create(do_write)
+    fiber.create(do_read)
     errinj.set("ERRINJ_VY_READ_PAGE_DELAY", true)
-    fiber.create(do_read, sk, c)
+    c1:put(true)
+    c2:get()
     errinj.set("ERRINJ_WAL_WRITE", true)
     if is_tx_faster_than_wal then
         errinj.set("ERRINJ_RELAY_FASTER_THAN_TX", true)
@@ -90,7 +99,7 @@ function read_prepared_with_delay(is_tx_faster_than_wal)
     errinj.set("ERRINJ_WAL_DELAY", false)
     fiber.sleep(0.1)
     errinj.set("ERRINJ_VY_READ_PAGE_DELAY", false)
-    local res = c:get()
+    local res = c2:get()
     errinj.set("ERRINJ_WAL_WRITE", false)
     if is_tx_faster_than_wal then
         errinj.set("ERRINJ_RELAY_FASTER_THAN_TX", false)
@@ -188,12 +197,27 @@ state = nil
 function do_write() s:replace{3, 20} end
  | ---
  | ...
-function init_read() gen, param, state = sk:pairs({20}, {iterator = box.index.EQ}) gen(param, state) end
+
+test_run:cmd("setopt delimiter ';'")
  | ---
+ | - true
  | ...
-function do_read() local _, ret = gen(param, state) c:put(ret) end
+function do_read()
+    box.begin({txn_isolation = 'read-committed'})
+    gen, param, state = sk:pairs({20}, {iterator = box.index.EQ})
+    c1:get()
+    c2:put(true)
+    gen(param, state)
+    local _, ret = gen(param, state)
+    box.commit()
+    c2:put(ret)
+end;
  | ---
  | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
 
 read_prepared_with_delay(false)
  | ---
@@ -215,7 +239,7 @@ fiber.sleep(0.1)
 --
 gen(param, state)
  | ---
- | - error: The read view is aborted
+ | - error: The transaction the cursor belongs to has ended
  | ...
 
 fiber.sleep(0.1)
diff --git a/test/vinyl/gh-3395-read-prepared-uncommitted.test.lua b/test/vinyl/gh-3395-read-prepared-uncommitted.test.lua
index 70136ddff0..afa70ad2f2 100644
--- a/test/vinyl/gh-3395-read-prepared-uncommitted.test.lua
+++ b/test/vinyl/gh-3395-read-prepared-uncommitted.test.lua
@@ -19,11 +19,20 @@ s:replace{3, 2}
 s:replace{2, 2}
 box.snapshot()
 
-c = fiber.channel(1)
+c1 = fiber.channel(1)
+c2 = fiber.channel(1)
 
 function do_write() s:replace{1, 2} end
-function init_read() end
-function do_read() local ret = sk:select{2} c:put(ret) end
+
+test_run:cmd("setopt delimiter ';'")
+function do_read()
+    c1:get()
+    c2:put(true)
+    box.begin({txn_isolation = 'read-committed'})
+    local ret = sk:select{2}
+    box.commit()
+    c2:put(ret)
+end;
 
 -- Since we have tuples stored on disk, read procedure may
 -- yield, opening window for WAL thread to commit or rollback
@@ -38,7 +47,6 @@ function do_read() local ret = sk:select{2} c:put(ret) end
 -- mem doesn't change and the statement is returned in the result set
 -- (i.e. dirty read takes place).
 --
-test_run:cmd("setopt delimiter ';'");
 -- is_tx_faster_than_wal determines whether wal thread has time
 -- to finish its routine or not. In the first case we add extra
 -- time gap to make sure that  WAL thread finished work and
@@ -46,10 +54,11 @@ test_run:cmd("setopt delimiter ';'");
 --
 function read_prepared_with_delay(is_tx_faster_than_wal)
     errinj.set("ERRINJ_WAL_DELAY", true)
-    fiber.create(do_write, s)
-    init_read()
+    fiber.create(do_write)
+    fiber.create(do_read)
     errinj.set("ERRINJ_VY_READ_PAGE_DELAY", true)
-    fiber.create(do_read, sk, c)
+    c1:put(true)
+    c2:get()
     errinj.set("ERRINJ_WAL_WRITE", true)
     if is_tx_faster_than_wal then
         errinj.set("ERRINJ_RELAY_FASTER_THAN_TX", true)
@@ -57,7 +66,7 @@ function read_prepared_with_delay(is_tx_faster_than_wal)
     errinj.set("ERRINJ_WAL_DELAY", false)
     fiber.sleep(0.1)
     errinj.set("ERRINJ_VY_READ_PAGE_DELAY", false)
-    local res = c:get()
+    local res = c2:get()
     errinj.set("ERRINJ_WAL_WRITE", false)
     if is_tx_faster_than_wal then
         errinj.set("ERRINJ_RELAY_FASTER_THAN_TX", false)
@@ -104,8 +113,19 @@ gen = nil
 param = nil
 state = nil
 function do_write() s:replace{3, 20} end
-function init_read() gen, param, state = sk:pairs({20}, {iterator = box.index.EQ}) gen(param, state) end
-function do_read() local _, ret = gen(param, state) c:put(ret) end
+
+test_run:cmd("setopt delimiter ';'")
+function do_read()
+    box.begin({txn_isolation = 'read-committed'})
+    gen, param, state = sk:pairs({20}, {iterator = box.index.EQ})
+    c1:get()
+    c2:put(true)
+    gen(param, state)
+    local _, ret = gen(param, state)
+    box.commit()
+    c2:put(ret)
+end;
+test_run:cmd("setopt delimiter ''");
 
 read_prepared_with_delay(false)
 -- All the same but test second scenario (WAL thread is not finished
diff --git a/test/vinyl/upsert.result b/test/vinyl/upsert.result
index f4c7ffbf0e..2db448c56c 100644
--- a/test/vinyl/upsert.result
+++ b/test/vinyl/upsert.result
@@ -824,7 +824,11 @@ test_run:cmd("setopt delimiter ';'")
 ---
 - true
 ...
-_ = fiber.create(function() ch:put(s:select()) end)
+_ = fiber.create(function()
+    box.begin({txn_isolation = 'read-committed'})
+    ch:put(s:select())
+    box.commit()
+end)
 s:upsert({10, 10}, {{'+', 2, 10}})
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/vinyl/upsert.test.lua b/test/vinyl/upsert.test.lua
index 02eeba8dfa..4d0e35920f 100644
--- a/test/vinyl/upsert.test.lua
+++ b/test/vinyl/upsert.test.lua
@@ -339,7 +339,11 @@ box.snapshot()
 s:get(10) -- add [10, 10] to the cache
 ch = fiber.channel(1)
 test_run:cmd("setopt delimiter ';'")
-_ = fiber.create(function() ch:put(s:select()) end)
+_ = fiber.create(function()
+    box.begin({txn_isolation = 'read-committed'})
+    ch:put(s:select())
+    box.commit()
+end)
 s:upsert({10, 10}, {{'+', 2, 10}})
 test_run:cmd("setopt delimiter ''");
 ch:get() -- should see the UPSERT and return [10, 20]
-- 
GitLab