From 26af611f0c97bc310edd721814f31a02e2424f45 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Tue, 18 Jul 2017 19:47:06 +0300
Subject: [PATCH] vinyl: use LSN from WAL instead of index_opts->lsn in vylog

An index can be dropped and then recreated with the same space/index id.
To discriminate between different incarnations of the same index during
recovery, we use index LSN stored in index options, as it is supposed to
be unique. However, the uniqueness property doesn't always hold:

 - If two indexes are created from different fibers, they might receive
   the same LSN.

 - If an index is created by inserting a record into _index system space
   directly, without using the public API, as it is the case in case of
   logical backup, its LSN might conflict with the LSN of an existing
   index or a previous incarnation of the same index stored in vylog.

These exceptions can result in unrecoverable errors during local
recovery, like this one:

  F> can't initialize storage: Invalid VYLOG file: Duplicate index id 3

Besides, storing LSN in index options is ugly, because LSN isn't a
user-defined option - it's a part of the implementation.

To fix this issues, let's use the LSN passed to Index::commitCreate,
i.e. the actual LSN received by the row that created the index. There's
one problem though: snapshot rows don't store LSNs. However, it doesn't
mean we can't find the index in vylog corresponding to a snapshot row:
we just need to look up the index by space_id/index_id instead of LSN
and then compare the snapshot LSN with the LSN of the last index
incarnation stored in vylog - if the latter turns out to be less, then
we need to load the index, otherwise the index is going to be dropped
and we need to load a dummy index. For more details, see the comment to
vy_recovery_load_index().

Another issue that needs a clarification is backward compatibility. The
thing is the LSN written to the index options lags behind the actual LSN
assigned to the row that created the index by 1. So to preserve backward
compatibility, we use LSN from index options for legacy indexes that
have it, while for indexes created after this patch we don't store LSN
in index options (set it to 0), neither do we use it on recovery (use
row LSN instead).

Closes #2536
---
 src/box/lua/schema.lua   |   2 -
 src/box/vinyl.c          |  55 +++++-----
 src/box/vy_index.c       |  67 +++++++++---
 src/box/vy_index.h       |  23 ++--
 src/box/vy_log.c         | 229 +++++++++++++++++++++++++++++++--------
 src/box/vy_log.h         |  39 ++++++-
 test/vinyl/layout.result |  16 +--
 7 files changed, 319 insertions(+), 112 deletions(-)

diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 0b760ce16d..62964ee346 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -505,8 +505,6 @@ box.schema.index.create = function(space_id, name, options)
             run_count_per_level = options.run_count_per_level,
             run_size_ratio = options.run_size_ratio,
             bloom_fpr = options.bloom_fpr,
-            lsn = box.info.signature,
-            bloom_fpr = options.bloom_fpr
     }
     local field_type_aliases = {
         num = 'unsigned'; -- Deprecated since 1.7.2
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index d364038bbf..75b6815a44 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -301,7 +301,7 @@ vy_run_prepare(struct vy_index *index)
 	if (run == NULL)
 		return NULL;
 	vy_log_tx_begin();
-	vy_log_prepare_run(index->opts.lsn, run->id);
+	vy_log_prepare_run(index->commit_lsn, run->id);
 	if (vy_log_tx_commit() < 0) {
 		vy_run_unref(run);
 		return NULL;
@@ -681,7 +681,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 		 * to log index dump anyway.
 		 */
 		vy_log_tx_begin();
-		vy_log_dump_index(index->opts.lsn, dump_lsn);
+		vy_log_dump_index(index->commit_lsn, dump_lsn);
 		if (vy_log_tx_commit() < 0)
 			goto fail;
 		vy_run_discard(new_run);
@@ -741,7 +741,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 	 * Log change in metadata.
 	 */
 	vy_log_tx_begin();
-	vy_log_create_run(index->opts.lsn, new_run->id, dump_lsn);
+	vy_log_create_run(index->commit_lsn, new_run->id, dump_lsn);
 	for (range = begin_range, i = 0; range != end_range;
 	     range = vy_range_tree_next(index->tree, range), i++) {
 		assert(i < index->range_count);
@@ -753,7 +753,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 		if (++loops % VY_YIELD_LOOPS == 0)
 			fiber_sleep(0); /* see comment above */
 	}
-	vy_log_dump_index(index->opts.lsn, dump_lsn);
+	vy_log_dump_index(index->commit_lsn, dump_lsn);
 	if (vy_log_tx_commit() < 0)
 		goto fail_free_slices;
 
@@ -1065,7 +1065,7 @@ vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 	rlist_foreach_entry(run, &unused_runs, in_unused)
 		vy_log_drop_run(run->id, gc_lsn);
 	if (new_slice != NULL) {
-		vy_log_create_run(index->opts.lsn, new_run->id,
+		vy_log_create_run(index->commit_lsn, new_run->id,
 				  new_run->dump_lsn);
 		vy_log_insert_slice(range->id, new_run->id, new_slice->id,
 				    tuple_data_or_null(new_slice->begin),
@@ -2274,8 +2274,6 @@ vy_delete_index(struct vy_env *env, struct vy_index *index)
 int
 vy_index_open(struct vy_env *env, struct vy_index *index)
 {
-	bool allow_missing = false;
-
 	switch (env->status) {
 	case VINYL_ONLINE:
 		/*
@@ -2299,19 +2297,9 @@ vy_index_open(struct vy_env *env, struct vy_index *index)
 		 * have already been created, so try to load
 		 * the index files from it.
 		 */
-		if (env->status == VINYL_FINAL_RECOVERY_LOCAL) {
-			/*
-			 * If we failed to log index creation before
-			 * restart, we won't find it in the log on
-			 * recovery. This is OK as the index doesn't
-			 * have any runs in this case. We will retry
-			 * to log index in vy_index_commit_create().
-			 * For now, initialize the index as new.
-			 */
-			allow_missing = true;
-		}
 		return vy_index_recover(index, env->recovery,
-					allow_missing);
+				vclock_sum(env->recovery_vclock),
+				env->status == VINYL_INITIAL_RECOVERY_LOCAL);
 	default:
 		unreachable();
 		return -1;
@@ -2321,8 +2309,6 @@ vy_index_open(struct vy_env *env, struct vy_index *index)
 void
 vy_index_commit_create(struct vy_env *env, struct vy_index *index, int64_t lsn)
 {
-	(void)lsn;
-
 	if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
 	    env->status == VINYL_FINAL_RECOVERY_LOCAL) {
 		/*
@@ -2333,13 +2319,23 @@ vy_index_commit_create(struct vy_env *env, struct vy_index *index, int64_t lsn)
 		 * the index isn't in the recovery context and we
 		 * need to retry to log it now.
 		 */
-		if (index->is_committed) {
+		if (index->commit_lsn >= 0) {
 			vy_scheduler_add_index(env->scheduler, index);
 			return;
 		}
 	}
 
-	index->is_committed = true;
+	/*
+	 * Backward compatibility fixup: historically, we used
+	 * box.info.signature for LSN of index creation, which
+	 * lags behind the LSN of the record that created the
+	 * index by 1. So for legacy indexes use the LSN from
+	 * index options.
+	 */
+	if (index->opts.lsn != 0)
+		lsn = index->opts.lsn;
+
+	index->commit_lsn = lsn;
 
 	assert(index->range_count == 1);
 	struct vy_range *range = vy_range_tree_first(index->tree);
@@ -2353,9 +2349,9 @@ vy_index_commit_create(struct vy_env *env, struct vy_index *index, int64_t lsn)
 	 * recovery.
 	 */
 	vy_log_tx_begin();
-	vy_log_create_index(index->opts.lsn, index->id,
+	vy_log_create_index(index->commit_lsn, index->id,
 			    index->space_id, index->user_key_def);
-	vy_log_insert_range(index->opts.lsn, range->id, NULL, NULL);
+	vy_log_insert_range(index->commit_lsn, range->id, NULL, NULL);
 	if (vy_log_tx_try_commit() != 0)
 		say_warn("failed to log index creation: %s",
 			 diag_last_error(diag_get())->errmsg);
@@ -2411,7 +2407,7 @@ vy_index_commit_drop(struct vy_env *env, struct vy_index *index)
 
 	vy_log_tx_begin();
 	vy_log_index_prune(index, vclock_sum(&env->scheduler->last_checkpoint));
-	vy_log_drop_index(index->opts.lsn);
+	vy_log_drop_index(index->commit_lsn);
 	if (vy_log_tx_try_commit() < 0)
 		say_warn("failed to log drop index: %s",
 			 diag_last_error(diag_get())->errmsg);
@@ -2451,6 +2447,8 @@ vy_prepare_truncate_space(struct vy_env *env, struct space *old_space,
 		struct vy_index *old_index = vy_index(old_space->index[i]);
 		struct vy_index *new_index = vy_index(new_space->index[i]);
 
+		new_index->commit_lsn = old_index->commit_lsn;
+
 		if (truncate_done) {
 			/*
 			 * We are replaying truncate from WAL and the
@@ -2525,8 +2523,9 @@ vy_commit_truncate_space(struct vy_env *env, struct space *old_space,
 		assert(new_index->range_count == 1);
 
 		vy_log_index_prune(old_index, gc_lsn);
-		vy_log_insert_range(new_index->opts.lsn, range->id, NULL, NULL);
-		vy_log_truncate_index(new_index->opts.lsn,
+		vy_log_insert_range(new_index->commit_lsn,
+				    range->id, NULL, NULL);
+		vy_log_truncate_index(new_index->commit_lsn,
 				      new_index->truncate_count);
 	}
 	if (vy_log_tx_try_commit() < 0)
diff --git a/src/box/vy_index.c b/src/box/vy_index.c
index 89e458dbf8..fddc9e91d5 100644
--- a/src/box/vy_index.c
+++ b/src/box/vy_index.c
@@ -202,6 +202,7 @@ vy_index_new(struct vy_index_env *index_env, struct vy_cache_env *cache_env,
 		goto fail_mem;
 
 	index->refs = 1;
+	index->commit_lsn = -1;
 	index->dump_lsn = -1;
 	vy_cache_create(&index->cache, cache_env, key_def);
 	rlist_create(&index->sealed);
@@ -383,7 +384,7 @@ vy_index_recovery_cb(const struct vy_log_record *record, void *cb_arg)
 	struct vy_slice *slice;
 	bool success = false;
 
-	assert(record->type == VY_LOG_CREATE_INDEX || index->is_committed);
+	assert(record->type == VY_LOG_CREATE_INDEX || index->commit_lsn >= 0);
 
 	if (record->type == VY_LOG_INSERT_RANGE ||
 	    record->type == VY_LOG_INSERT_SLICE) {
@@ -401,27 +402,35 @@ vy_index_recovery_cb(const struct vy_log_record *record, void *cb_arg)
 
 	switch (record->type) {
 	case VY_LOG_CREATE_INDEX:
-		assert(record->index_lsn == index->opts.lsn);
-		index->is_committed = true;
+		assert(record->index_id == index->id);
+		assert(record->space_id == index->space_id);
+		assert(index->commit_lsn < 0);
+		assert(record->index_lsn >= 0);
+		index->commit_lsn = record->index_lsn;
 		break;
 	case VY_LOG_DUMP_INDEX:
-		assert(record->index_lsn == index->opts.lsn);
+		assert(record->index_lsn == index->commit_lsn);
 		index->dump_lsn = record->dump_lsn;
 		break;
 	case VY_LOG_TRUNCATE_INDEX:
-		assert(record->index_lsn == index->opts.lsn);
+		assert(record->index_lsn == index->commit_lsn);
 		index->truncate_count = record->truncate_count;
 		break;
 	case VY_LOG_DROP_INDEX:
-		assert(record->index_lsn == index->opts.lsn);
+		assert(record->index_lsn == index->commit_lsn);
 		index->is_dropped = true;
+		/*
+		 * If the index was dropped, we don't need to replay
+		 * truncate (see vy_prepare_truncate_space()).
+		 */
+		index->truncate_count = UINT64_MAX;
 		break;
 	case VY_LOG_PREPARE_RUN:
 		break;
 	case VY_LOG_CREATE_RUN:
 		if (record->is_dropped)
 			break;
-		assert(record->index_lsn == index->opts.lsn);
+		assert(record->index_lsn == index->commit_lsn);
 		run = vy_run_new(record->run_id);
 		if (run == NULL)
 			goto out;
@@ -443,7 +452,7 @@ vy_index_recovery_cb(const struct vy_log_record *record, void *cb_arg)
 	case VY_LOG_DROP_RUN:
 		break;
 	case VY_LOG_INSERT_RANGE:
-		assert(record->index_lsn == index->opts.lsn);
+		assert(record->index_lsn == index->commit_lsn);
 		range = vy_range_new(record->range_id, begin, end,
 				     index->key_def);
 		if (range == NULL)
@@ -486,7 +495,7 @@ vy_index_recovery_cb(const struct vy_log_record *record, void *cb_arg)
 
 int
 vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
-		 bool allow_missing)
+		 int64_t lsn, bool snapshot_recovery)
 {
 	assert(index->range_count == 0);
 
@@ -497,7 +506,18 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 		return -1;
 	}
 
-	int rc = vy_recovery_load_index(recovery, index->opts.lsn,
+	/*
+	 * Backward compatibility fixup: historically, we used
+	 * box.info.signature for LSN of index creation, which
+	 * lags behind the LSN of the record that created the
+	 * index by 1. So for legacy indexes use the LSN from
+	 * index options.
+	 */
+	if (index->opts.lsn != 0)
+		lsn = index->opts.lsn;
+
+	int rc = vy_recovery_load_index(recovery, index->space_id, index->id,
+					lsn, snapshot_recovery,
 					vy_index_recovery_cb, &arg);
 
 	mh_int_t k;
@@ -509,7 +529,7 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 				 tt_sprintf("Unused run %lld in index %lld",
 					    (long long)run->id,
-					    (long long)index->opts.lsn));
+					    (long long)index->commit_lsn));
 			rc = -1;
 			/*
 			 * Continue the loop to unreference
@@ -526,14 +546,27 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 		return -1;
 	}
 
-	if (!index->is_committed) {
+	if (index->commit_lsn < 0) {
 		/* Index was not found in the metadata log. */
-		if (!allow_missing) {
+		if (snapshot_recovery) {
+			/*
+			 * All indexes created from snapshot rows must
+			 * be present in vylog, because snapshot can
+			 * only succeed if vylog has been successfully
+			 * flushed.
+			 */
 			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 				 tt_sprintf("Index %lld not found",
-					    (long long)index->opts.lsn));
+					    (long long)index->commit_lsn));
 			return -1;
 		}
+		/*
+		 * If we failed to log index creation before restart,
+		 * we won't find it in the log on recovery. This is
+		 * OK as the index doesn't have any runs in this case.
+		 * We will retry to log index in vy_index_commit_create().
+		 * For now, just create the initial range.
+		 */
 		return vy_index_init_range_tree(index);
 	}
 
@@ -578,7 +611,7 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 	if (prev == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Index %lld has empty range tree",
-				    (long long)index->opts.lsn));
+				    (long long)index->commit_lsn));
 		return -1;
 	}
 	if (prev->end != NULL) {
@@ -932,7 +965,7 @@ vy_index_split_range(struct vy_index *index, struct vy_range *range)
 	vy_log_delete_range(range->id);
 	for (int i = 0; i < n_parts; i++) {
 		part = parts[i];
-		vy_log_insert_range(index->opts.lsn, part->id,
+		vy_log_insert_range(index->commit_lsn, part->id,
 				    tuple_data_or_null(part->begin),
 				    tuple_data_or_null(part->end));
 		rlist_foreach_entry(slice, &part->slices, in_range)
@@ -999,7 +1032,7 @@ vy_index_coalesce_range(struct vy_index *index, struct vy_range *range)
 	 * Log change in metadata.
 	 */
 	vy_log_tx_begin();
-	vy_log_insert_range(index->opts.lsn, result->id,
+	vy_log_insert_range(index->commit_lsn, result->id,
 			    tuple_data_or_null(result->begin),
 			    tuple_data_or_null(result->end));
 	for (it = first; it != end; it = vy_range_tree_next(index->tree, it)) {
diff --git a/src/box/vy_index.h b/src/box/vy_index.h
index 6c0a7089a8..ed81434e62 100644
--- a/src/box/vy_index.h
+++ b/src/box/vy_index.h
@@ -220,10 +220,10 @@ struct vy_index {
 	 */
 	int64_t dump_lsn;
 	/**
-	 * This flag is set if the index creation was
-	 * committed to the metadata log.
+	 * LSN of the row that committed the index or -1 if
+	 * the index was not committed to the metadata log.
 	 */
-	bool is_committed;
+	int64_t commit_lsn;
 	/**
 	 * This flag is set if the index was dropped.
 	 * It is also set on local recovery if the index
@@ -341,13 +341,22 @@ vy_index_create(struct vy_index *index);
  * metadata log, rebuilds the range tree, and opens run
  * files.
  *
- * If @allow_missing is set, the function will not fail
- * in case the index is not found in the recovery context,
- * instead it will initialize the index as brand new.
+ * If @snapshot_recovery is set, the index is recovered from
+ * the last snapshot. In particular, this means that the index
+ * must have been logged in the metadata log and so if the
+ * function does not find it in the recovery context, it will
+ * fail. If the flag is unset, the index is recovered from a
+ * WAL, in which case a missing index is OK - it just means we
+ * failed to log it before restart and have to retry during
+ * WAL replay.
+ *
+ * @lsn is the LSN of the row that created the index.
+ * If the index is recovered from a snapshot, it is set
+ * to the snapshot signature.
  */
 int
 vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
-		 bool allow_missing);
+		 int64_t lsn, bool snapshot_recovery);
 
 /**
  * Return generation of in-memory data stored in an index
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 2c7aa84c7b..aa9a6209d5 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -169,8 +169,10 @@ static struct vy_log vy_log;
 
 /** Recovery context. */
 struct vy_recovery {
-	/** ID -> vy_index_recovery_info. */
-	struct mh_i64ptr_t *index_hash;
+	/** space_id, index_id -> vy_index_recovery_info. */
+	struct mh_i64ptr_t *index_id_hash;
+	/** index_lsn -> vy_index_recovery_info. */
+	struct mh_i64ptr_t *index_lsn_hash;
 	/** ID -> vy_range_recovery_info. */
 	struct mh_i64ptr_t *range_hash;
 	/** ID -> vy_run_recovery_info. */
@@ -1225,11 +1227,34 @@ vy_log_write(const struct vy_log_record *record)
 	}
 }
 
-/** Lookup a vinyl index in vy_recovery::index_hash map. */
+/**
+ * Given space_id and index_id, return the corresponding key in
+ * vy_recovery::index_id_hash map.
+ */
+static inline int64_t
+vy_recovery_index_id_hash(uint32_t space_id, uint32_t index_id)
+{
+	return ((uint64_t)space_id << 32) + index_id;
+}
+
+/** Lookup a vinyl index in vy_recovery::index_id_hash map. */
+static struct vy_index_recovery_info *
+vy_recovery_lookup_index_by_id(struct vy_recovery *recovery,
+			       uint32_t space_id, uint32_t index_id)
+{
+	int64_t key = vy_recovery_index_id_hash(space_id, index_id);
+	struct mh_i64ptr_t *h = recovery->index_id_hash;
+	mh_int_t k = mh_i64ptr_find(h, key, NULL);
+	if (k == mh_end(h))
+		return NULL;
+	return mh_i64ptr_node(h, k)->val;
+}
+
+/** Lookup a vinyl index in vy_recovery::index_lsn_hash map. */
 static struct vy_index_recovery_info *
-vy_recovery_lookup_index(struct vy_recovery *recovery, int64_t index_lsn)
+vy_recovery_lookup_index_by_lsn(struct vy_recovery *recovery, int64_t index_lsn)
 {
-	struct mh_i64ptr_t *h = recovery->index_hash;
+	struct mh_i64ptr_t *h = recovery->index_lsn_hash;
 	mh_int_t k = mh_i64ptr_find(h, index_lsn, NULL);
 	if (k == mh_end(h))
 		return NULL;
@@ -1280,43 +1305,104 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
 			 uint32_t index_id, uint32_t space_id,
 			 const struct key_def *key_def)
 {
+	struct vy_index_recovery_info *index;
+	struct key_def *key_def_copy;
+	struct mh_i64ptr_node_t node;
+	struct mh_i64ptr_t *h;
+	mh_int_t k;
+
+	/*
+	 * Make a copy of the key definition to be used for
+	 * the new index incarnation.
+	 */
 	if (key_def == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Missing key definition for index %lld",
 				    (long long)index_lsn));
 		return -1;
 	}
-	if (vy_recovery_lookup_index(recovery, index_lsn) != NULL) {
+	key_def_copy = key_def_dup(key_def);
+	if (key_def_copy == NULL)
+		return -1;
+
+	/*
+	 * Look up the index in the hash.
+	 */
+	h = recovery->index_id_hash;
+	node.key = vy_recovery_index_id_hash(space_id, index_id);
+	k = mh_i64ptr_find(h, node.key, NULL);
+	index = (k != mh_end(h)) ? mh_i64ptr_node(h, k)->val : NULL;
+
+	if (index == NULL) {
+		/*
+		 * This is the first time the index is created
+		 * (there's no previous incarnation in the context).
+		 * Allocate a node for the index and add it to
+		 * the hash.
+		 */
+		index = malloc(sizeof(*index));
+		if (index == NULL) {
+			diag_set(OutOfMemory, sizeof(*index),
+				 "malloc", "struct vy_index_recovery_info");
+			free(key_def_copy);
+			return -1;
+		}
+		index->index_id = index_id;
+		index->space_id = space_id;
+		rlist_create(&index->ranges);
+		rlist_create(&index->runs);
+
+		node.val = index;
+		if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
+			diag_set(OutOfMemory, 0, "mh_i64ptr_put",
+				 "mh_i64ptr_node_t");
+			free(key_def_copy);
+			free(index);
+			return -1;
+		}
+	} else {
+		/*
+		 * The index was dropped and recreated with the
+		 * same ID. Update its key definition (because it
+		 * could have changed since the last time it was
+		 * used) and reset its state.
+		 */
+		if (!index->is_dropped) {
+			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+				 tt_sprintf("Index %u/%u created twice",
+					    (unsigned)space_id,
+					    (unsigned)index_id));
+			free(key_def_copy);
+			return -1;
+		}
+		assert(index->index_id == index_id);
+		assert(index->space_id == space_id);
+		free(index->key_def);
+	}
+
+	index->index_lsn = index_lsn;
+	index->key_def = key_def_copy;
+	index->is_dropped = false;
+	index->dump_lsn = -1;
+	index->truncate_count = 0;
+
+	/*
+	 * Add the index to the LSN hash.
+	 */
+	h = recovery->index_lsn_hash;
+	node.key = index_lsn;
+	node.val = index;
+	if (mh_i64ptr_find(h, index_lsn, NULL) != mh_end(h)) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Duplicate index id %lld",
 				    (long long)index_lsn));
 		return -1;
 	}
-	size_t size = sizeof(struct vy_index_recovery_info) +
-			key_def_sizeof(key_def->part_count);
-	struct vy_index_recovery_info *index = malloc(size);
-	if (index == NULL) {
-		diag_set(OutOfMemory, size,
-			 "malloc", "struct vy_index_recovery_info");
-		return -1;
-	}
-	struct mh_i64ptr_t *h = recovery->index_hash;
-	struct mh_i64ptr_node_t node = { index_lsn, index };
 	if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
-		diag_set(OutOfMemory, 0, "mh_i64ptr_put", "mh_i64ptr_node_t");
-		free(index);
+		diag_set(OutOfMemory, 0, "mh_i64ptr_put",
+			 "mh_i64ptr_node_t");
 		return -1;
 	}
-	index->index_lsn = index_lsn;
-	index->index_id = index_id;
-	index->space_id = space_id;
-	index->key_def = (void *)index + sizeof(*index);
-	memcpy(index->key_def, key_def, key_def_sizeof(key_def->part_count));
-	index->is_dropped = false;
-	index->dump_lsn = -1;
-	index->truncate_count = 0;
-	rlist_create(&index->ranges);
-	rlist_create(&index->runs);
 	return 0;
 }
 
@@ -1330,7 +1416,7 @@ static int
 vy_recovery_drop_index(struct vy_recovery *recovery, int64_t index_lsn)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Index %lld deleted but not registered",
@@ -1373,7 +1459,7 @@ vy_recovery_dump_index(struct vy_recovery *recovery,
 		       int64_t index_lsn, int64_t dump_lsn)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Dump of unregistered index %lld",
@@ -1400,7 +1486,7 @@ vy_recovery_truncate_index(struct vy_recovery *recovery,
 			   int64_t index_lsn, int64_t truncate_count)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Truncation of unregistered index %lld",
@@ -1462,7 +1548,7 @@ vy_recovery_prepare_run(struct vy_recovery *recovery, int64_t index_lsn,
 			int64_t run_id)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Run %lld created for unregistered "
@@ -1498,7 +1584,7 @@ vy_recovery_create_run(struct vy_recovery *recovery, int64_t index_lsn,
 		       int64_t run_id, int64_t dump_lsn)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Run %lld created for unregistered "
@@ -1603,7 +1689,7 @@ vy_recovery_insert_range(struct vy_recovery *recovery, int64_t index_lsn,
 		return -1;
 	}
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Range %lld created for unregistered "
@@ -1882,17 +1968,20 @@ vy_recovery_new_f(va_list ap)
 		goto fail;
 	}
 
-	recovery->index_hash = NULL;
+	recovery->index_id_hash = NULL;
+	recovery->index_lsn_hash = NULL;
 	recovery->range_hash = NULL;
 	recovery->run_hash = NULL;
 	recovery->slice_hash = NULL;
 	recovery->max_id = -1;
 
-	recovery->index_hash = mh_i64ptr_new();
+	recovery->index_id_hash = mh_i64ptr_new();
+	recovery->index_lsn_hash = mh_i64ptr_new();
 	recovery->range_hash = mh_i64ptr_new();
 	recovery->run_hash = mh_i64ptr_new();
 	recovery->slice_hash = mh_i64ptr_new();
-	if (recovery->index_hash == NULL ||
+	if (recovery->index_id_hash == NULL ||
+	    recovery->index_lsn_hash == NULL ||
 	    recovery->range_hash == NULL ||
 	    recovery->run_hash == NULL ||
 	    recovery->slice_hash == NULL) {
@@ -1985,8 +2074,20 @@ vy_recovery_delete_hash(struct mh_i64ptr_t *h)
 void
 vy_recovery_delete(struct vy_recovery *recovery)
 {
-	if (recovery->index_hash != NULL)
-		vy_recovery_delete_hash(recovery->index_hash);
+	if (recovery->index_id_hash != NULL) {
+		mh_int_t i;
+		mh_foreach(recovery->index_id_hash, i) {
+			struct vy_index_recovery_info *index;
+			index = mh_i64ptr_node(recovery->index_id_hash, i)->val;
+			free(index->key_def);
+			free(index);
+		}
+		mh_i64ptr_delete(recovery->index_id_hash);
+	}
+	if (recovery->index_lsn_hash != NULL) {
+		/* Hash entries were deleted along with index_id_hash. */
+		mh_i64ptr_delete(recovery->index_lsn_hash);
+	}
 	if (recovery->range_hash != NULL)
 		vy_recovery_delete_hash(recovery->range_hash);
 	if (recovery->run_hash != NULL)
@@ -2109,9 +2210,9 @@ vy_recovery_iterate(struct vy_recovery *recovery,
 		    vy_recovery_cb cb, void *cb_arg)
 {
 	mh_int_t i;
-	mh_foreach(recovery->index_hash, i) {
+	mh_foreach(recovery->index_id_hash, i) {
 		struct vy_index_recovery_info *index;
-		index = mh_i64ptr_node(recovery->index_hash, i)->val;
+		index = mh_i64ptr_node(recovery->index_id_hash, i)->val;
 		/*
 		 * Purge dropped indexes that are not referenced by runs
 		 * (and thus not needed for garbage collection) from the
@@ -2126,12 +2227,52 @@ vy_recovery_iterate(struct vy_recovery *recovery,
 }
 
 int
-vy_recovery_load_index(struct vy_recovery *recovery, int64_t index_lsn,
+vy_recovery_load_index(struct vy_recovery *recovery,
+		       uint32_t space_id, uint32_t index_id,
+		       int64_t index_lsn, bool snapshot_recovery,
 		       vy_recovery_cb cb, void *cb_arg)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index(recovery, index_lsn);
+	index = vy_recovery_lookup_index_by_id(recovery, space_id, index_id);
 	if (index == NULL)
 		return 0;
-	return vy_recovery_iterate_index(index, cb, cb_arg);
+	/* See the comment to the function declaration. */
+	if (index_lsn < index->index_lsn) {
+		/*
+		 * Loading a past incarnation of the index.
+		 * Emit create/drop records to indicate that
+		 * it is going to be dropped by a WAL statement
+		 * and hence doesn't need to be recovered.
+		 */
+		struct vy_log_record record;
+		vy_log_record_init(&record);
+		record.type = VY_LOG_CREATE_INDEX;
+		record.index_id = index->index_id;
+		record.space_id = index->space_id;
+		record.index_lsn = index_lsn;
+		if (vy_recovery_cb_call(cb, cb_arg, &record) != 0)
+			return -1;
+		vy_log_record_init(&record);
+		record.type = VY_LOG_DROP_INDEX;
+		record.index_lsn = index_lsn;
+		if (vy_recovery_cb_call(cb, cb_arg, &record) != 0)
+			return -1;
+		return 0;
+	} else if (snapshot_recovery || index_lsn == index->index_lsn) {
+		/*
+		 * Loading the last incarnation of the index.
+		 * Replay all records we have recovered from
+		 * the log for this index.
+		 */
+		return vy_recovery_iterate_index(index, cb, cb_arg);
+	} else {
+		/*
+		 * The requested incarnation is missing in the recovery
+		 * context, because we failed to log it before restart.
+		 * Do nothing and let the caller retry logging.
+		 */
+		assert(!snapshot_recovery);
+		assert(index_lsn > index->index_lsn);
+		return 0;
+	}
 }
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index 953dd2938f..676b8e64b0 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -163,11 +163,8 @@ struct vy_log_record {
 	/** Type of the record. */
 	enum vy_log_record_type type;
 	/**
-	 * Unique ID of the vinyl index.
-	 *
-	 * The ID must be unique for different incarnations of
-	 * the same index, so we use LSN from the time of index
-	 * creation for it.
+	 * LSN from the time of index creation.
+	 * Used to identify indexes in vylog.
 	 */
 	int64_t index_lsn;
 	/** Unique ID of the vinyl range. */
@@ -389,9 +386,39 @@ vy_recovery_iterate(struct vy_recovery *recovery,
  *
  * Note, this function returns 0 if there's no index with the requested
  * id in the recovery context. In this case, @cb isn't called at all.
+ *
+ * The @snapshot_recovery flag indicates that the row that created
+ * the index was loaded from a snapshot, in which case @index_lsn is
+ * the snapshot signature. Otherwise @index_lsn is the LSN of the WAL
+ * row that created the index.
+ *
+ * The index is looked up by @space_id and @index_id while @index_lsn
+ * is used to discern different incarnations of the same index as
+ * follows. Let @record denote the vylog record corresponding to the
+ * last incarnation of the index. Then
+ *
+ * - If @snapshot_recovery is set and @index_lsn >= @record->index_lsn,
+ *   the last index incarnation was created before the snapshot and we
+ *   need to load it right now.
+ *
+ * - If @snapshot_recovery is set and @index_lsn < @record->index_lsn,
+ *   the last index incarnation was created after the snapshot, i.e.
+ *   the index loaded now is going to be dropped so load a dummy.
+ *
+ * - If @snapshot_recovery is unset and @index_lsn < @record->index_lsn,
+ *   the last index incarnation is created further in WAL, load a dummy.
+ *
+ * - If @snapshot_recovery is unset and @index_lsn == @record->index_lsn,
+ *   load the last index incarnation.
+ *
+ * - If @snapshot_recovery is unset and @index_lsn > @record->index_lsn,
+ *   it seems we failed to log index creation before restart. In this
+ *   case don't do anything. The caller is supposed to retry logging.
  */
 int
-vy_recovery_load_index(struct vy_recovery *recovery, int64_t index_lsn,
+vy_recovery_load_index(struct vy_recovery *recovery,
+		       uint32_t space_id, uint32_t index_id,
+		       int64_t index_lsn, bool snapshot_recovery,
 		       vy_recovery_cb cb, void *cb_arg);
 
 /**
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index e75a9aa1cc..2ec06ad9a0 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -121,19 +121,19 @@ result
     - - HEADER:
           type: INSERT
         BODY:
-          tuple: [0, {0: 2, 7: [[0, 'unsigned']], 6: 512}]
+          tuple: [0, {0: 3, 7: [[0, 'unsigned']], 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [10, {0: 2, 9: 8}]
+          tuple: [10, {0: 3, 9: 8}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [5, {0: 2, 2: 2, 9: 8}]
+          tuple: [5, {0: 3, 2: 2, 9: 8}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [4, {0: 2, 2: 1}]
+          tuple: [4, {0: 3, 2: 1}]
       - HEADER:
           type: INSERT
         BODY:
@@ -141,7 +141,7 @@ result
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [2, {0: 2}]
+          tuple: [2, {0: 3}]
       - HEADER:
           type: INSERT
         BODY:
@@ -159,12 +159,12 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [4, {0: 2, 2: 4}]
+          tuple: [4, {0: 3, 2: 4}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [5, {0: 2, 2: 4, 9: 11}]
+          tuple: [5, {0: 3, 2: 4, 9: 11}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
@@ -174,7 +174,7 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [10, {0: 2, 9: 11}]
+          tuple: [10, {0: 3, 9: 11}]
   - - 00000000000000000002.index
     - - HEADER:
           type: RUNINFO
-- 
GitLab