From e576ca91c9555a3eb1012a05fd873f56565f028d Mon Sep 17 00:00:00 2001
From: Alexandr Lyapunov <aleks@tarantool.org>
Date: Fri, 21 Jul 2017 18:47:48 +0300
Subject: [PATCH] vinyl: fix a crash in mem_iterator_restore

Fix a bug in restoration LE/LT when last_stmt is not in the mem
(it came from another mem or run).

Fix a bug in restoration after EOF reached.

Add a unit test of mem_iterator_restore.

Fixes #2614
---
 src/box/vy_cache.c      |   2 +-
 src/box/vy_mem.c        |  22 ++-
 test/unit/vy_mem.c      | 399 +++++++++++++++++++++++++++++++++++++++-
 test/unit/vy_mem.result |  10 +
 test/vinyl/gh.result    |  56 ++++++
 test/vinyl/gh.test.lua  |  37 ++++
 6 files changed, 522 insertions(+), 4 deletions(-)

diff --git a/src/box/vy_cache.c b/src/box/vy_cache.c
index 850d286a19..6f1bfa3b7d 100644
--- a/src/box/vy_cache.c
+++ b/src/box/vy_cache.c
@@ -316,7 +316,7 @@ vy_cache_add(struct vy_cache *cache, struct tuple *stmt,
 	} else {
 		vy_cache_tree_iterator_next(&cache->cache_tree, &inserted);
 	}
-	/* Check that there are not statements between prev_stmt and stmt */
+	/* Check that there are no statements between prev_stmt and stmt */
 	if (!vy_cache_tree_iterator_is_invalid(&inserted)) {
 		struct vy_cache_entry **prev_check_entry =
 			vy_cache_tree_iterator_get_elem(&cache->cache_tree,
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index 1d97cdf41b..843efdc970 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -598,7 +598,7 @@ vy_mem_iterator_next_lsn(struct vy_stmt_iterator *vitr, struct tuple **ret)
  * Restore the current position (if necessary).
  * @sa struct vy_stmt_iterator comments.
  *
- * @param last_stmt the key the iterator was positioned on
+ * @param last_stmt the key the the read iterator was positioned on.
  *
  * @retval 0 nothing changed
  * @retval 1 iterator position was changed
@@ -714,6 +714,20 @@ vy_mem_iterator_restore(struct vy_stmt_iterator *vitr,
 			vy_mem_iterator_start_from(itr, iterator_type,
 						   last_stmt);
 		}
+		if (itr->curr_stmt != NULL && last_stmt != NULL) {
+			if (vy_stmt_compare(itr->curr_stmt, last_stmt, def) == 0 &&
+			    vy_stmt_lsn(itr->curr_stmt) >= vy_stmt_lsn(last_stmt)) {
+				while (true) {
+					if (vy_mem_iterator_next_lsn_impl(itr)) {
+						int rc = vy_mem_iterator_next_key_impl(itr);
+						(void)rc;
+						break;
+					} else if (vy_stmt_lsn(itr->curr_stmt) < vy_stmt_lsn(last_stmt)) {
+						break;
+					}
+				}
+			}
+		}
 		if (itr->curr_stmt != NULL &&
 		    vy_mem_iterator_copy_to(itr, ret) < 0)
 			return -1;
@@ -817,7 +831,10 @@ vy_mem_iterator_restore(struct vy_stmt_iterator *vitr,
 	}
 	/* scan key by key */
 	assert(!curr_last);
-	bool is_last  = false;
+	int cmp = vy_stmt_compare(this_key, last_stmt, def);
+	if (cmp > 0)
+		goto done; /* (2) */
+	bool is_last = cmp == 0;
 	bool found = false; /* found within this key */
 	while (true) {
 		if (!found &&
@@ -839,6 +856,7 @@ vy_mem_iterator_restore(struct vy_stmt_iterator *vitr,
 		int cmp = vy_stmt_compare(t, this_key, def);
 		assert(cmp >= 0);
 		if (cmp > 0) {
+			/* The next key starts */
 			if (is_last)
 				goto done; /* (2) */
 			cmp = vy_stmt_compare(t, last_stmt, def);
diff --git a/test/unit/vy_mem.c b/test/unit/vy_mem.c
index 03f0443ac0..1e24a6104a 100644
--- a/test/unit/vy_mem.c
+++ b/test/unit/vy_mem.c
@@ -1,9 +1,10 @@
+#include <trivia/config.h>
 #include "memory.h"
 #include "fiber.h"
 #include <small/slab_cache.h>
 #include "vy_iterators_helper.h"
 
-void
+static void
 test_basic(void)
 {
 	header();
@@ -69,6 +70,400 @@ test_basic(void)
 	check_plan();
 }
 
+static void
+test_iterator_initial_restore()
+{
+	header();
+
+	plan(3);
+
+	/* Create key_def */
+	uint32_t fields[] = { 0 };
+	uint32_t types[] = { FIELD_TYPE_UNSIGNED };
+	struct key_def *key_def = box_key_def_new(fields, types, 1);
+	assert(key_def != NULL);
+
+	/* Create format */
+	struct tuple_format *format = tuple_format_new(&vy_tuple_format_vtab,
+						       &key_def, 1, 0);
+	assert(format != NULL);
+
+	/* Create format with column mask */
+	struct tuple_format *format_with_colmask =
+		vy_tuple_format_new_with_colmask(format);
+	assert(format_with_colmask != NULL);
+
+	/* Create upsert format */
+	struct tuple_format *format_upsert =
+		vy_tuple_format_new_upsert(format);
+	assert(format_upsert != NULL);
+
+	/* Create lsregion */
+	struct lsregion lsregion;
+	struct slab_cache *slab_cache = cord_slab_cache();
+	lsregion_create(&lsregion, slab_cache->arena);
+
+	/* Create mem */
+	int64_t generation = 1;
+	struct vy_mem *mem = vy_mem_new(&lsregion, generation, key_def,
+					format, format_with_colmask,
+					format_upsert, 0);
+
+	const uint64_t count = 100;
+	for (uint64_t i = 0; i < count; i++) {
+		const struct vy_stmt_template stmts[2] = {
+			STMT_TEMPLATE(200, REPLACE, i * 2 + 1),
+			STMT_TEMPLATE(300, REPLACE, i * 2 + 1)
+		};
+		vy_mem_insert_template(mem, &stmts[0]);
+		vy_mem_insert_template(mem, &stmts[1]);
+	}
+
+	/* initial restore */
+	bool wrong_rc = false;
+	bool wrong_lsn = false;
+	bool wrong_value = false;
+	int i_fail = 0;
+	for (uint64_t i = 0; i < (count * 2 + 1) * 3; i++) {
+		uint64_t key = i % (count * 2 + 1);
+		int64_t lsn = (i / (count * 2 + 1)) * 100 + 100;
+		bool value_is_expected = lsn != 100 && (key % 2 == 1);
+		char data[16];
+		char *end = data;
+		end = mp_encode_uint(end, key);
+		assert(end <= data + sizeof(data));
+		struct tuple *stmt = vy_stmt_new_select(mem->format, data, 1);
+
+		struct vy_mem_iterator itr;
+		struct vy_mem_iterator_stat stats = {0};
+		struct vy_read_view rv;
+		rv.vlsn = lsn;
+		const struct vy_read_view *prv = &rv;
+		vy_mem_iterator_open(&itr, &stats, mem, ITER_EQ, stmt,
+				     &prv, NULL);
+		struct tuple *t;
+		bool stop = false;
+		int rc = itr.base.iface->restore(&itr.base, NULL, &t, &stop);
+
+		if (rc != 0) {
+			wrong_rc = true;
+			i_fail = i;
+			itr.base.iface->cleanup(&itr.base);
+			itr.base.iface->close(&itr.base);
+			continue;
+		}
+
+		if (value_is_expected) {
+			if (t == NULL) {
+				wrong_value = true;
+				i_fail = i;
+				itr.base.iface->cleanup(&itr.base);
+				itr.base.iface->close(&itr.base);
+				continue;
+			}
+			if (vy_stmt_lsn(t) != lsn) {
+				wrong_lsn = true;
+				i_fail = i;
+			}
+			uint32_t got_val;
+			if (tuple_field_u32(t, 0, &got_val) ||
+			    got_val != key) {
+				wrong_value = true;
+				i_fail = i;
+			}
+		} else {
+			if (t != NULL) {
+				wrong_value = true;
+				i_fail = i;
+			}
+		}
+
+		itr.base.iface->cleanup(&itr.base);
+		itr.base.iface->close(&itr.base);
+
+		tuple_unref(stmt);
+	}
+
+	ok(!wrong_rc, "check rc %d", i_fail);
+	ok(!wrong_lsn, "check lsn %d", i_fail);
+	ok(!wrong_value, "check value %d", i_fail);
+
+	/* Clean up */
+	vy_mem_delete(mem);
+	lsregion_destroy(&lsregion);
+	box_key_def_delete(key_def);
+
+	fiber_gc();
+
+	check_plan();
+
+	footer();
+}
+
+static void
+test_iterator_restore_after_insertion()
+{
+	header();
+
+	plan(1);
+
+	/* Create key_def */
+	uint32_t fields[] = { 0 };
+	uint32_t types[] = { FIELD_TYPE_UNSIGNED };
+	struct key_def *key_def = box_key_def_new(fields, types, 1);
+	assert(key_def != NULL);
+
+	/* Create format */
+	struct tuple_format *format = tuple_format_new(&vy_tuple_format_vtab,
+						       &key_def, 1, 0);
+	assert(format != NULL);
+	tuple_format_ref(format, 1);
+
+	/* Create format with column mask */
+	struct tuple_format *format_with_colmask =
+		vy_tuple_format_new_with_colmask(format);
+	assert(format_with_colmask != NULL);
+	tuple_format_ref(format_with_colmask, 1);
+
+	/* Create upsert format */
+	struct tuple_format *format_upsert =
+		vy_tuple_format_new_upsert(format);
+	assert(format_upsert != NULL);
+	tuple_format_ref(format_upsert, 1);
+
+	/* Create lsregion */
+	struct lsregion lsregion;
+	struct slab_cache *slab_cache = cord_slab_cache();
+	lsregion_create(&lsregion, slab_cache->arena);
+
+	struct tuple *select_key = vy_stmt_new_select(format, "", 0);
+
+	uint64_t restore_on_value = 20;
+	uint64_t restore_on_value_reverse = 60;
+	char data[16];
+	char *end = data;
+	end = mp_encode_array(end, 1);
+	end = mp_encode_uint(end, restore_on_value);
+	struct tuple *restore_on_key = vy_stmt_new_replace(format, data, end);
+	vy_stmt_set_lsn(restore_on_key, 100);
+	end = data;
+	end = mp_encode_array(end, 1);
+	end = mp_encode_uint(end, restore_on_value_reverse);
+	struct tuple *restore_on_key_reverse = vy_stmt_new_replace(format, data, end);
+	vy_stmt_set_lsn(restore_on_key_reverse, 100);
+
+	bool wrong_output = false;
+	int i_fail = 0;
+
+	for (uint64_t i = 0; i < ((1000ULL * 3) << 2); i++) {
+		uint64_t v = i;
+		bool direct = !(v & 1);
+		v >>= 1;
+		bool has40_50 = v & 1;
+		v >>= 1;
+		bool has40_150 = v & 1;
+		v >>= 1;
+		const size_t possible_count = 9;
+		uint64_t middle_value = possible_count / 2 * 10; /* 40 */
+		bool hasX_100[possible_count]; /* X = 0,10,20,30,40,50,60,70,80 */
+		bool addX_100[possible_count]; /* X = 0,10,20,30,40,50,60,70,80 */
+		bool add_smth = false;
+		for (size_t j = 0; j < possible_count; j++) {
+			uint64_t trinity = v % 3;
+			v /= 3;
+			hasX_100[j] = trinity == 1;
+			addX_100[j] = trinity == 2;
+			add_smth = add_smth || addX_100[j];
+		}
+		if (!add_smth)
+			continue;
+		uint64_t expected_count = 0;
+		uint64_t expected_values[possible_count];
+		int64_t expected_lsns[possible_count];
+		if (direct) {
+			for (size_t j = 0; j < possible_count; j++) {
+				if (hasX_100[j]) {
+					expected_values[expected_count] = j * 10;
+					expected_lsns[expected_count] = 100;
+					expected_count++;
+				} else if (j == possible_count / 2 && has40_50) {
+					expected_values[expected_count] = middle_value;
+					expected_lsns[expected_count] = 50;
+					expected_count++;
+				}
+			}
+		} else {
+			for (size_t k = possible_count; k > 0; k--) {
+				size_t j = k - 1;
+				if (hasX_100[j]) {
+					expected_values[expected_count] = j * 10;
+					expected_lsns[expected_count] = 100;
+					expected_count++;
+				} else if (j == possible_count / 2 && has40_50) {
+					expected_values[expected_count] = middle_value;
+					expected_lsns[expected_count] = 50;
+					expected_count++;
+				}
+			}
+		}
+
+		/* Create mem */
+		int64_t generation = 1;
+		struct vy_mem *mem = vy_mem_new(&lsregion, generation, key_def,
+						format, format_with_colmask,
+						format_upsert, 0);
+		if (has40_50) {
+			const struct vy_stmt_template temp =
+				STMT_TEMPLATE(50, REPLACE, 40);
+			vy_mem_insert_template(mem, &temp);
+		}
+		if (has40_150) {
+			const struct vy_stmt_template temp =
+				STMT_TEMPLATE(150, REPLACE, 40);
+			vy_mem_insert_template(mem, &temp);
+		}
+		for (size_t j = 0; j < possible_count; j++) {
+			if (hasX_100[j]) {
+				const struct vy_stmt_template temp =
+					STMT_TEMPLATE(100, REPLACE, j * 10);
+				vy_mem_insert_template(mem, &temp);
+			}
+		}
+
+		struct vy_mem_iterator itr;
+		struct vy_mem_iterator_stat stats = {0};
+		struct vy_read_view rv;
+		rv.vlsn = 100;
+		const struct vy_read_view *prv = &rv;
+		vy_mem_iterator_open(&itr, &stats, mem,
+				     direct ? ITER_GE : ITER_LE, select_key,
+				     &prv, NULL);
+		struct tuple *t;
+		bool stop = false;
+		int rc = itr.base.iface->next_key(&itr.base, &t, &stop);
+		assert(rc == 0);
+		size_t j = 0;
+		while (t != NULL) {
+			if (j >= expected_count) {
+				wrong_output = true;
+				break;
+			}
+			uint32_t val = 42;
+			tuple_field_u32(t, 0, &val);
+			if (val != expected_values[j] ||
+			    vy_stmt_lsn(t) != expected_lsns[j]) {
+				wrong_output = true;
+				break;
+			}
+			j++;
+			if (direct && val >= middle_value)
+				break;
+			else if(!direct && val <= middle_value)
+				break;
+			int rc = itr.base.iface->next_key(&itr.base, &t, &stop);
+			assert(rc == 0);
+		}
+		if (t == NULL && j != expected_count)
+			wrong_output = true;
+		if (wrong_output) {
+			i_fail = i;
+			break;
+		}
+
+
+		for (size_t j = 0; j < possible_count; j++) {
+			if (addX_100[j]) {
+				const struct vy_stmt_template temp =
+					STMT_TEMPLATE(100, REPLACE, j * 10);
+				vy_mem_insert_template(mem, &temp);
+			}
+		}
+
+		expected_count = 0;
+		if (direct) {
+			for (size_t j = 0; j < possible_count; j++) {
+				if (j * 10 <= restore_on_value)
+					continue;
+				if (hasX_100[j] || addX_100[j]) {
+					expected_values[expected_count] = j * 10;
+					expected_lsns[expected_count] = 100;
+					expected_count++;
+				} else if (j == possible_count / 2 && has40_50) {
+					expected_values[expected_count] = middle_value;
+					expected_lsns[expected_count] = 50;
+					expected_count++;
+				}
+			}
+		} else {
+			for (size_t k = possible_count; k > 0; k--) {
+				size_t j = k - 1;
+				if (j * 10 >= restore_on_value_reverse)
+					continue;
+				if (hasX_100[j] || addX_100[j]) {
+					expected_values[expected_count] = j * 10;
+					expected_lsns[expected_count] = 100;
+					expected_count++;
+				} else if (j == possible_count / 2 && has40_50) {
+					expected_values[expected_count] = middle_value;
+					expected_lsns[expected_count] = 50;
+					expected_count++;
+				}
+			}
+		}
+
+		if (direct)
+			rc = itr.base.iface->restore(&itr.base, restore_on_key, &t, &stop);
+		else
+			rc = itr.base.iface->restore(&itr.base, restore_on_key_reverse, &t, &stop);
+
+		j = 0;
+		while (t != NULL) {
+			if (j >= expected_count) {
+				wrong_output = true;
+				break;
+			}
+			uint32_t val = 42;
+			tuple_field_u32(t, 0, &val);
+			if (val != expected_values[j] ||
+			    vy_stmt_lsn(t) != expected_lsns[j]) {
+				wrong_output = true;
+				break;
+			}
+			j++;
+			int rc = itr.base.iface->next_key(&itr.base, &t, &stop);
+			assert(rc == 0);
+		}
+		if (j != expected_count)
+			wrong_output = true;
+		if (wrong_output) {
+			i_fail = i;
+			break;
+		}
+
+		vy_mem_delete(mem);
+		lsregion_gc(&lsregion, 2);
+	}
+
+	ok(!wrong_output, "check wrong_output %d", i_fail);
+
+	/* Clean up */
+	tuple_unref(select_key);
+	tuple_unref(restore_on_key);
+	tuple_unref(restore_on_key_reverse);
+
+	tuple_format_ref(format_upsert, -1);
+	tuple_format_ref(format_with_colmask, -1);
+	tuple_format_ref(format, -1);
+	lsregion_destroy(&lsregion);
+	box_key_def_delete(key_def);
+
+	fiber_gc();
+
+	check_plan();
+
+	footer();
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -77,6 +472,8 @@ main(int argc, char *argv[])
 	tuple_init();
 
 	test_basic();
+	test_iterator_initial_restore();
+	test_iterator_restore_after_insertion();
 
 	tuple_free();
 	fiber_free();
diff --git a/test/unit/vy_mem.result b/test/unit/vy_mem.result
index 1a885962f9..6da4070c7c 100644
--- a/test/unit/vy_mem.result
+++ b/test/unit/vy_mem.result
@@ -13,3 +13,13 @@ ok 10 - vy_mem_rollback 2
 ok 11 - vy_mem->version
 ok 12 - vy_mem->version
 	*** test_basic: done ***
+	*** test_iterator_initial_restore ***
+1..3
+ok 1 - check rc 0
+ok 2 - check lsn 0
+ok 3 - check value 0
+	*** test_iterator_initial_restore: done ***
+	*** test_iterator_restore_after_insertion ***
+1..1
+ok 1 - check wrong_output 0
+	*** test_iterator_restore_after_insertion: done ***
diff --git a/test/vinyl/gh.result b/test/vinyl/gh.result
index 184471300f..60aa34dbf3 100644
--- a/test/vinyl/gh.result
+++ b/test/vinyl/gh.result
@@ -654,3 +654,59 @@ _ = s:replace({1, string.rep('x', 35 * 1024 * 1024)})
 s:drop()
 ---
 ...
+-- https://github.com/tarantool/tarantool/issues/2614
+count = 10000
+---
+...
+s = box.schema.space.create("test", {engine='vinyl'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+cont = true
+---
+...
+finished = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+_ = fiber.create(function()
+    while cont do
+        s:select(math.random(count), {iterator = box.index.LE, limit = 10})
+        fiber.sleep(0.01)
+    end
+    finished = finished + 1
+end);
+---
+...
+_ = fiber.create(function()
+    while cont do
+        box.snapshot()
+        fiber.sleep(0.01)
+    end
+    finished = finished + 1
+end);
+---
+...
+for i = 1, count do
+    s:replace{math.random(count)}
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+cont = false
+---
+...
+while finished ~= 2 do fiber.sleep(0.01) end
+---
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/gh.test.lua b/test/vinyl/gh.test.lua
index eacccc9b8a..e4aad67519 100644
--- a/test/vinyl/gh.test.lua
+++ b/test/vinyl/gh.test.lua
@@ -272,3 +272,40 @@ s = box.schema.space.create('vinyl', { engine = 'vinyl' })
 i = box.space.vinyl:create_index('primary')
 _ = s:replace({1, string.rep('x', 35 * 1024 * 1024)})
 s:drop()
+
+-- https://github.com/tarantool/tarantool/issues/2614
+
+count = 10000
+
+s = box.schema.space.create("test", {engine='vinyl'})
+_ = s:create_index('pk')
+
+cont = true
+finished = 0
+
+test_run:cmd("setopt delimiter ';'")
+_ = fiber.create(function()
+    while cont do
+        s:select(math.random(count), {iterator = box.index.LE, limit = 10})
+        fiber.sleep(0.01)
+    end
+    finished = finished + 1
+end);
+
+_ = fiber.create(function()
+    while cont do
+        box.snapshot()
+        fiber.sleep(0.01)
+    end
+    finished = finished + 1
+end);
+
+for i = 1, count do
+    s:replace{math.random(count)}
+end;
+test_run:cmd("setopt delimiter ''");
+
+cont = false
+while finished ~= 2 do fiber.sleep(0.01) end
+
+s:drop()
-- 
GitLab