From 0986bd99870694a2f677a2448cf2e26cd6d57373 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov@tarantool.org>
Date: Mon, 10 Oct 2022 18:43:14 +0300
Subject: [PATCH] vinyl: allow to skip prepared statements in mem iterator

To implement read-confirmed and best-effort isolation levels, we need
to skip unconfirmed (aka prepared) statements in the mem iterator. To
achieve that, we add a new flag is_prepared_ok. Unless the flag is set,
the iterator will skip prepared statements even if they are visible from
the iterator read view. Upon skipping a statement, the iterator updates
min_skipped_plsn if the LSN of the skipped statement is less. We'll use
this LSN to update the transaction read view accordingly.

Needed for #5522

NO_DOC=internal
NO_CHANGELOG=internal
---
 src/box/vy_mem.c           |  29 +++-
 src/box/vy_mem.h           |  16 ++-
 src/box/vy_point_lookup.c  |   2 +-
 src/box/vy_read_iterator.c |   5 +-
 test/unit/vy_mem.c         | 274 ++++++++++++++++++++++++++++++++++++-
 5 files changed, 315 insertions(+), 11 deletions(-)

diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index 7ede22430d..273c9b7fe4 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -294,6 +294,24 @@ vy_mem_iterator_step(struct vy_mem_iterator *itr)
 	return 0;
 }
 
+/**
+ * Return true if the current statement should be skipped.
+ * Note, the function may update min_skipped_plsn.
+ */
+static inline bool
+vy_mem_iterator_should_skip_curr(struct vy_mem_iterator *itr)
+{
+	struct tuple *stmt = itr->curr.stmt;
+	if (vy_stmt_flags(stmt) & VY_STMT_SKIP_READ)
+		return true;
+	if (!itr->is_prepared_ok && vy_stmt_is_prepared(stmt)) {
+		itr->min_skipped_plsn = MIN(itr->min_skipped_plsn,
+					    vy_stmt_lsn(stmt));
+		return true;
+	}
+	return false;
+}
+
 /**
  * Find next record with lsn <= itr->lsn record.
  * Current position must be at the beginning of serie of records with the
@@ -312,7 +330,7 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
 					       &itr->curr_pos)));
 	struct key_def *cmp_def = itr->mem->cmp_def;
 	while (vy_stmt_lsn(itr->curr.stmt) > (**itr->read_view).vlsn ||
-	       vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ) {
+	       vy_mem_iterator_should_skip_curr(itr)) {
 		if (vy_mem_iterator_step(itr) != 0 ||
 		    (itr->iterator_type == ITER_EQ &&
 		     vy_entry_compare(itr->key, itr->curr, cmp_def))) {
@@ -358,9 +376,7 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
 	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
 	itr->curr = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
 						   &itr->curr_pos);
-
-	/* Skip VY_STMT_SKIP_READ statements, if any. */
-	while (vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ) {
+	while (vy_mem_iterator_should_skip_curr(itr)) {
 		vy_mem_tree_iterator_next(&itr->mem->tree, &itr->curr_pos);
 		assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
 		itr->curr = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
@@ -441,7 +457,8 @@ vy_mem_iterator_seek(struct vy_mem_iterator *itr, struct vy_entry last)
 void
 vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *stat,
 		     struct vy_mem *mem, enum iterator_type iterator_type,
-		     struct vy_entry key, const struct vy_read_view **rv)
+		     struct vy_entry key, const struct vy_read_view **rv,
+		     bool is_prepared_ok)
 {
 	itr->stat = stat;
 
@@ -456,6 +473,8 @@ vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *s
 	itr->curr = vy_entry_none();
 
 	itr->search_started = false;
+	itr->is_prepared_ok = is_prepared_ok;
+	itr->min_skipped_plsn = INT64_MAX;
 }
 
 /*
diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h
index 4f06c755fd..fa1bce3362 100644
--- a/src/box/vy_mem.h
+++ b/src/box/vy_mem.h
@@ -364,6 +364,19 @@ struct vy_mem_iterator {
 
 	/* Is false until first .._next_.. method is called */
 	bool search_started;
+	/**
+	 * The iterator may return prepared (unconfirmed) statements only if
+	 * this flag is set. If any prepared statements are skipped because of
+	 * this flag, min_skipped_plsn will be set to the min LSN among all
+	 * skipped prepared statements. The transaction is supposed to update
+	 * its read view accordingly to guarantee serializability.
+	 */
+	bool is_prepared_ok;
+	/**
+	 * Initialized to INT64_MAX. Set to the min LSN among all skipped
+	 * prepared statements if is_prepared_ok is false.
+	 */
+	int64_t min_skipped_plsn;
 };
 
 /**
@@ -372,7 +385,8 @@ struct vy_mem_iterator {
 void
 vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *stat,
 		     struct vy_mem *mem, enum iterator_type iterator_type,
-		     struct vy_entry key, const struct vy_read_view **rv);
+		     struct vy_entry key, const struct vy_read_view **rv,
+		     bool is_prepared_ok);
 
 /**
  * Advance a mem iterator to the next key.
diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c
index 80b5c59334..8d73328f25 100644
--- a/src/box/vy_point_lookup.c
+++ b/src/box/vy_point_lookup.c
@@ -97,7 +97,7 @@ vy_point_lookup_scan_mem(struct vy_lsm *lsm, struct vy_mem *mem,
 {
 	struct vy_mem_iterator mem_itr;
 	vy_mem_iterator_open(&mem_itr, &lsm->stat.memory.iterator,
-			     mem, ITER_EQ, key, rv);
+			     mem, ITER_EQ, key, rv, /*is_prepared_ok=*/true);
 	struct vy_history mem_history;
 	vy_history_create(&mem_history, &lsm->env->history_node_pool);
 	int rc = vy_mem_iterator_next(&mem_itr, &mem_history);
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index fc060aee66..08dadaf03a 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -632,7 +632,8 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr)
 	assert(lsm->mem != NULL);
 	sub_src = vy_read_iterator_add_src(itr);
 	vy_mem_iterator_open(&sub_src->mem_iterator, &lsm->stat.memory.iterator,
-			     lsm->mem, iterator_type, itr->key, itr->read_view);
+			     lsm->mem, iterator_type, itr->key, itr->read_view,
+			     /*is_prepared_ok=*/true);
 	/* Add sealed in-memory indexes. */
 	struct vy_mem *mem;
 	rlist_foreach_entry(mem, &lsm->sealed, in_sealed) {
@@ -640,7 +641,7 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr)
 		vy_mem_iterator_open(&sub_src->mem_iterator,
 				     &lsm->stat.memory.iterator,
 				     mem, iterator_type, itr->key,
-				     itr->read_view);
+				     itr->read_view, /*is_prepared_ok=*/true);
 	}
 }
 
diff --git a/test/unit/vy_mem.c b/test/unit/vy_mem.c
index 439e3684b4..389af6255d 100644
--- a/test/unit/vy_mem.c
+++ b/test/unit/vy_mem.c
@@ -171,7 +171,7 @@ test_iterator_restore_after_insertion(void)
 		const struct vy_read_view *prv = &rv;
 		vy_mem_iterator_open(&itr, &stats, mem,
 				     direct ? ITER_GE : ITER_LE, select_key,
-				     &prv);
+				     &prv, /*is_prepared_ok=*/true);
 		struct vy_entry e;
 		struct vy_history history;
 		vy_history_create(&history, &history_node_pool);
@@ -297,12 +297,281 @@ test_iterator_restore_after_insertion(void)
 	footer();
 }
 
+static const char *
+lsn_str(int64_t lsn)
+{
+	char *buf = tt_static_buf();
+	if (lsn == INT64_MAX) {
+		return "INT64_MAX";
+	} else if (lsn > MAX_LSN) {
+		snprintf(buf, TT_STATIC_BUF_LEN, "MAX_LSN+%lld",
+			 (long long)(lsn - MAX_LSN));
+	} else {
+		snprintf(buf, TT_STATIC_BUF_LEN, "%lld", (long long)lsn);
+	}
+	return buf;
+}
+
+static const char *
+iterator_type_str(int type)
+{
+	switch (type) {
+	case ITER_EQ: return "EQ";
+	case ITER_GE: return "GE";
+	case ITER_GT: return "GT";
+	case ITER_LE: return "LE";
+	case ITER_LT: return "LT";
+	default:
+		unreachable();
+	}
+}
+
+struct test_iterator_expected {
+	struct vy_stmt_template stmt;
+	int64_t min_skipped_plsn;
+};
+
+static void
+test_iterator_helper(
+		struct vy_mem *mem, enum iterator_type type,
+		const struct vy_stmt_template *key_template,
+		int64_t vlsn, bool is_prepared_ok,
+		const struct test_iterator_expected *expected,
+		int expected_count, int64_t min_skipped_plsn)
+{
+	struct vy_read_view rv;
+	rv.vlsn = vlsn;
+	const struct vy_read_view *prv = &rv;
+	struct vy_mem_iterator it;
+	struct vy_mem_iterator_stat stat;
+	memset(&stat, 0, sizeof(stat));
+	struct vy_history history;
+	vy_history_create(&history, &history_node_pool);
+	struct vy_entry key = vy_new_simple_stmt(format, key_def,
+						 key_template);
+	vy_mem_iterator_open(&it, &stat, mem, type, key, &prv, is_prepared_ok);
+	int i;
+	for (i = 0; ; i++) {
+		fail_unless(vy_mem_iterator_next(&it, &history) == 0);
+		struct vy_entry entry = vy_history_last_stmt(&history);
+		if (vy_entry_is_equal(entry, vy_entry_none()))
+			break;
+		ok(i < expected_count &&
+		   it.min_skipped_plsn == expected[i].min_skipped_plsn &&
+		   vy_stmt_are_same(entry, &expected[i].stmt, format, key_def),
+		   "type=%s key=%s vlsn=%s min_skipped_plsn=%s stmt=%s",
+		   iterator_type_str(type), tuple_str(key.stmt), lsn_str(vlsn),
+		   lsn_str(it.min_skipped_plsn), vy_stmt_str(entry.stmt));
+	}
+	ok(i == expected_count && it.min_skipped_plsn == min_skipped_plsn,
+	   "type=%s key=%s vlsn=%s min_skipped_plsn=%s eof",
+	   iterator_type_str(type), tuple_str(key.stmt), lsn_str(vlsn),
+	   lsn_str(it.min_skipped_plsn));
+	vy_mem_iterator_close(&it);
+	vy_history_cleanup(&history);
+	tuple_unref(key.stmt);
+}
+
+static void
+test_iterator_skip_prepared(void)
+{
+	header();
+	plan(44);
+	struct vy_stmt_template stmt_templates[] = {
+		STMT_TEMPLATE(10, REPLACE, 100, 1),
+		STMT_TEMPLATE(20, REPLACE, 100, 2),
+		STMT_TEMPLATE(MAX_LSN + 10, REPLACE, 100, 3),
+		STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 100, 4),
+		STMT_TEMPLATE(15, REPLACE, 200, 1),
+		STMT_TEMPLATE(25, REPLACE, 200, 2),
+		STMT_TEMPLATE(MAX_LSN + 15, REPLACE, 300, 1),
+		STMT_TEMPLATE(MAX_LSN + 5, REPLACE, 400, 1),
+		STMT_TEMPLATE(MAX_LSN + 25, REPLACE, 400, 2),
+		STMT_TEMPLATE_FLAGS(10, REPLACE, VY_STMT_SKIP_READ, 500, 1),
+		STMT_TEMPLATE_FLAGS(15, REPLACE, VY_STMT_SKIP_READ, 500, 2),
+		STMT_TEMPLATE_FLAGS(5, REPLACE, VY_STMT_SKIP_READ, 600, 1),
+		STMT_TEMPLATE(10, REPLACE, 600, 2),
+		STMT_TEMPLATE_FLAGS(15, REPLACE, VY_STMT_SKIP_READ, 600, 3),
+		STMT_TEMPLATE(30, REPLACE, 600, 4),
+		STMT_TEMPLATE_FLAGS(45, REPLACE, VY_STMT_SKIP_READ, 600, 5),
+		STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 600, 5),
+	};
+	struct vy_mem *mem = create_test_mem(key_def);
+	for (int i = 0; i < (int)lengthof(stmt_templates); i++) {
+		vy_mem_insert_template(mem, &stmt_templates[i]);
+	}
+	/* type=GE key=100 vlsn=20 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 100);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(20, REPLACE, 100, 2), INT64_MAX},
+			{STMT_TEMPLATE(15, REPLACE, 200, 1), INT64_MAX},
+			{STMT_TEMPLATE(10, REPLACE, 600, 2), INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_GE, &key, /*vlsn=*/20,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=GE key=100 vlsn=MAX_LSN+1 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 100);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(20, REPLACE, 100, 2), INT64_MAX},
+			{STMT_TEMPLATE(25, REPLACE, 200, 2), INT64_MAX},
+			{STMT_TEMPLATE(30, REPLACE, 600, 4), INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_GE, &key, /*vlsn=*/MAX_LSN + 1,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=GE key=100 vlsn=MAX_LSN+20 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 100);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(20, REPLACE, 100, 2), MAX_LSN + 10},
+			{STMT_TEMPLATE(25, REPLACE, 200, 2), MAX_LSN + 10},
+			{STMT_TEMPLATE(30, REPLACE, 600, 4), MAX_LSN + 5},
+		};
+		test_iterator_helper(mem, ITER_GE, &key, /*vlsn=*/MAX_LSN + 20,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/MAX_LSN + 5);
+	}
+	/* type=GE key=100 vlsn=MAX_LSN+20 is_prepared_ok=true */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 100);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 100, 4),
+				INT64_MAX},
+			{STMT_TEMPLATE(25, REPLACE, 200, 2), INT64_MAX},
+			{STMT_TEMPLATE(MAX_LSN + 15, REPLACE, 300, 1),
+				INT64_MAX},
+			{STMT_TEMPLATE(MAX_LSN + 5, REPLACE, 400, 1),
+				INT64_MAX},
+			{STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 600, 5),
+				INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_GE, &key, /*vlsn=*/MAX_LSN + 20,
+				     /*is_prepared_ok=*/true,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=LT key=1000 vlsn=20 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 1000);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(10, REPLACE, 600, 2), INT64_MAX},
+			{STMT_TEMPLATE(15, REPLACE, 200, 1), INT64_MAX},
+			{STMT_TEMPLATE(20, REPLACE, 100, 2), INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_LT, &key, /*vlsn=*/20,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=LT key=1000 vlsn=MAX_LSN+1 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 1000);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(30, REPLACE, 600, 4), INT64_MAX},
+			{STMT_TEMPLATE(25, REPLACE, 200, 2), INT64_MAX},
+			{STMT_TEMPLATE(20, REPLACE, 100, 2), INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_LT, &key, /*vlsn=*/MAX_LSN + 1,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=LT key=1000 vlsn=MAX_LSN+20 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 1000);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(30, REPLACE, 600, 4), MAX_LSN + 20},
+			{STMT_TEMPLATE(25, REPLACE, 200, 2), MAX_LSN + 5},
+			{STMT_TEMPLATE(20, REPLACE, 100, 2), MAX_LSN + 5},
+		};
+		test_iterator_helper(mem, ITER_LT, &key, /*vlsn=*/MAX_LSN + 20,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/MAX_LSN + 5);
+	}
+	/* type=LT key=1000 vlsn=MAX_LSN+20 is_prepared_ok=true */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 1000);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 600, 5),
+				INT64_MAX},
+			{STMT_TEMPLATE(MAX_LSN + 5, REPLACE, 400, 1),
+				INT64_MAX},
+			{STMT_TEMPLATE(MAX_LSN + 15, REPLACE, 300, 1),
+				INT64_MAX},
+			{STMT_TEMPLATE(25, REPLACE, 200, 2), INT64_MAX},
+			{STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 100, 4),
+				INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_LT, &key, /*vlsn=*/MAX_LSN + 20,
+				     /*is_prepared_ok=*/true,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=EQ key=600 vlsn=20 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 600);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(10, REPLACE, 600, 2), INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_EQ, &key, /*vlsn=*/20,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=EQ key=600 vlsn=MAX_LSN+1 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 600);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(30, REPLACE, 600, 4), INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_EQ, &key, /*vlsn=*/MAX_LSN + 1,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	/* type=EQ key=600 vlsn=MAX_LSN+20 is_prepared_ok=false */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 600);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(30, REPLACE, 600, 4), MAX_LSN + 20},
+		};
+		test_iterator_helper(mem, ITER_EQ, &key, /*vlsn=*/MAX_LSN + 20,
+				     /*is_prepared_ok=*/false,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/MAX_LSN + 20);
+	}
+	/* type=EQ key=600 vlsn=MAX_LSN+20 is_prepared_ok=true */
+	{
+		struct vy_stmt_template key = STMT_TEMPLATE(0, SELECT, 600);
+		struct test_iterator_expected expected[] = {
+			{STMT_TEMPLATE(MAX_LSN + 20, REPLACE, 600, 5),
+				INT64_MAX},
+		};
+		test_iterator_helper(mem, ITER_EQ, &key, /*vlsn=*/MAX_LSN + 20,
+				     /*is_prepared_ok=*/true,
+				     expected, lengthof(expected),
+				     /*min_skipped_plsn=*/INT64_MAX);
+	}
+	vy_mem_delete(mem);
+	footer();
+	check_plan();
+}
+
 int
 main(void)
 {
 	vy_iterator_C_test_init(0);
 
-	plan(2);
+	plan(3);
 
 	uint32_t fields[] = { 0 };
 	uint32_t types[] = { FIELD_TYPE_UNSIGNED };
@@ -314,6 +583,7 @@ main(void)
 
 	test_basic();
 	test_iterator_restore_after_insertion();
+	test_iterator_skip_prepared();
 
 	tuple_format_unref(format);
 	key_def_delete(key_def);
-- 
GitLab