diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 18aa1ba5b6188805e4bcea1b2d79aad0b977f485..fb1214023e2b6c4e61cdc6c1a4165f1e9bd37f5c 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1273,25 +1273,45 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 			  struct tuple *tuple, struct tuple **result)
 {
 	assert(lsm->index_id > 0);
-	/*
-	 * No need in vy_tx_track() as the tuple must already be
-	 * tracked in the secondary index LSM tree.
-	 */
+
 	if (vy_point_lookup(lsm->pk, tx, rv, tuple, result) != 0)
 		return -1;
 
-	if (*result == NULL) {
+	if (*result == NULL ||
+	    vy_tuple_compare(*result, tuple, lsm->key_def) != 0) {
+		/*
+		 * If a tuple read from a secondary index doesn't
+		 * match the tuple corresponding to it in the
+		 * primary index, it must have been overwritten or
+		 * deleted, but the DELETE statement hasn't been
+		 * propagated to the secondary index yet. In this
+		 * case silently skip this tuple.
+		 */
+		if (*result != NULL) {
+			tuple_unref(*result);
+			*result = NULL;
+		}
 		/*
-		 * All indexes of a space must be consistent, i.e.
-		 * if a tuple is present in one index, it must be
-		 * present in all other indexes as well, so we can
-		 * get here only if there's a bug somewhere in vinyl.
-		 * Don't abort as core dump won't really help us in
-		 * this case. Just warn the user and proceed to the
-		 * next tuple.
+		 * We must purge stale tuples from the cache before
+		 * storing the resulting interval in order to avoid
+		 * chain intersections, which are not tolerated by
+		 * the tuple cache implementation.
 		 */
-		say_warn("%s: key %s missing in primary index",
-			 vy_lsm_name(lsm), vy_stmt_str(tuple));
+		vy_cache_on_write(&lsm->cache, tuple, NULL);
+		return 0;
+	}
+
+	/*
+	 * Even though the tuple is tracked in the secondary index
+	 * read set, we still must track the full tuple read from
+	 * the primary index, otherwise the transaction won't be
+	 * aborted if this tuple is overwritten or deleted, because
+	 * the DELETE statement is not written to secondary indexes
+	 * immediately.
+	 */
+	if (tx != NULL && vy_tx_track_point(tx, lsm->pk, *result) != 0) {
+		tuple_unref(*result);
+		return -1;
 	}
 
 	if ((*rv)->vlsn == INT64_MAX)
@@ -1604,7 +1624,6 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 	struct vy_lsm *lsm = vy_lsm_find_unique(space, request->index_id);
 	if (lsm == NULL)
 		return -1;
-	bool has_secondary = space->index_count > 1;
 	const char *key = request->key;
 	uint32_t part_count = mp_decode_array(&key);
 	if (vy_unique_key_validate(lsm, key, part_count))
@@ -1614,12 +1633,9 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 	 * before deletion.
 	 * - if the space has on_replace triggers and need to pass
 	 *   to them the old tuple.
-	 *
-	 * - if the space has one or more secondary indexes, then
-	 *   we need to extract secondary keys from the old tuple
-	 *   and pass them to indexes for deletion.
+	 * - if deletion is done by a secondary index.
 	 */
-	if (has_secondary || !rlist_empty(&space->on_replace)) {
+	if (lsm->index_id > 0 || !rlist_empty(&space->on_replace)) {
 		if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx),
 				      key, part_count, &stmt->old_tuple) != 0)
 			return -1;
@@ -1628,8 +1644,7 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 	}
 	int rc = 0;
 	struct tuple *delete;
-	if (has_secondary) {
-		assert(stmt->old_tuple != NULL);
+	if (stmt->old_tuple != NULL) {
 		delete = vy_stmt_new_surrogate_delete(pk->mem_format,
 						      stmt->old_tuple);
 		if (delete == NULL)
@@ -1642,12 +1657,14 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 			if (rc != 0)
 				break;
 		}
-	} else { /* Primary is the single index in the space. */
+	} else {
 		assert(lsm->index_id == 0);
 		delete = vy_stmt_new_surrogate_delete_from_key(request->key,
 						pk->key_def, pk->mem_format);
 		if (delete == NULL)
 			return -1;
+		if (space->index_count > 1)
+			vy_stmt_set_flags(delete, VY_STMT_DEFERRED_DELETE);
 		rc = vy_tx_set(tx, pk, delete);
 	}
 	tuple_unref(delete);
@@ -2166,11 +2183,9 @@ vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 	/*
 	 * Get the overwritten tuple from the primary index if
 	 * the space has on_replace triggers, in which case we
-	 * need to pass the old tuple to trigger callbacks, or
-	 * if the space has secondary indexes and so we need
-	 * the old tuple to delete it from them.
+	 * need to pass the old tuple to trigger callbacks.
 	 */
-	if (space->index_count > 1 || !rlist_empty(&space->on_replace)) {
+	if (!rlist_empty(&space->on_replace)) {
 		if (vy_get(pk, tx, vy_tx_read_view(tx),
 			   stmt->new_tuple, &stmt->old_tuple) != 0)
 			return -1;
@@ -2181,6 +2196,8 @@ vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 			 */
 			vy_stmt_set_type(stmt->new_tuple, IPROTO_INSERT);
 		}
+	} else if (space->index_count > 1) {
+		vy_stmt_set_flags(stmt->new_tuple, VY_STMT_DEFERRED_DELETE);
 	}
 	/*
 	 * Replace in the primary index without explicit deletion
@@ -4286,11 +4303,192 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 
 /* {{{ Deferred DELETE handling */
 
+static void
+vy_deferred_delete_on_commit(struct trigger *trigger, void *event)
+{
+	struct txn *txn = event;
+	struct vy_mem *mem = trigger->data;
+	/*
+	 * Update dump_lsn so that we can skip dumped deferred
+	 * DELETE statements on WAL recovery.
+	 */
+	assert(mem->dump_lsn <= txn->signature);
+	mem->dump_lsn = txn->signature;
+	/* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
+	vy_mem_unpin(mem);
+}
+
+static void
+vy_deferred_delete_on_rollback(struct trigger *trigger, void *event)
+{
+	(void)event;
+	struct vy_mem *mem = trigger->data;
+	/* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
+	vy_mem_unpin(mem);
+}
+
+/**
+ * Callback invoked when a deferred DELETE statement is written
+ * to _vinyl_deferred_delete system space. It extracts the
+ * deleted tuple, its LSN, and the target space id from the
+ * system space row, then generates a deferred DELETE statement
+ * and inserts it into secondary indexes of the target space.
+ *
+ * Note, this callback is also invoked during local WAL recovery
+ * to restore deferred DELETE statements that haven't been dumped
+ * to disk. To skip deferred DELETEs that have been dumped, we
+ * use the same technique we employ for normal WAL statements,
+ * i.e. we filter them by LSN, see vy_is_committed_one(). To do
+ * that, we need to account the LSN of a WAL row that generated
+ * a deferred DELETE statement to vy_lsm::dump_lsn, so we install
+ * an on_commit trigger that propagates the LSN of the WAL row to
+ * vy_mem::dump_lsn, which in will contribute to vy_lsm::dump_lsn
+ * when the in-memory tree is dumped, see vy_task_dump_new().
+ *
+ * This implies that we don't yield between statements of the
+ * same transaction, because if we did, two deferred DELETEs with
+ * the same WAL LSN could land in different in-memory trees: if
+ * one of the trees got dumped while the other didn't, we would
+ * mistakenly skip both statements on recovery.
+ */
 static void
 vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 {
 	(void)trigger;
-	(void)event;
+
+	struct txn *txn = event;
+	struct txn_stmt *stmt = txn_current_stmt(txn);
+	bool is_first_statement = txn_is_first_statement(txn);
+
+	if (stmt->new_tuple == NULL)
+		return;
+	/*
+	 * Extract space id, LSN of the deferred DELETE statement,
+	 * and the deleted tuple from the system space row.
+	 */
+	struct tuple_iterator it;
+	tuple_rewind(&it, stmt->new_tuple);
+	uint32_t space_id;
+	if (tuple_next_u32(&it, &space_id) != 0)
+		diag_raise();
+	uint64_t lsn;
+	if (tuple_next_u64(&it, &lsn) != 0)
+		diag_raise();
+	const char *delete_data = tuple_next_with_type(&it, MP_ARRAY);
+	if (delete_data == NULL)
+		diag_raise();
+	const char *delete_data_end = delete_data;
+	mp_next(&delete_data_end);
+
+	/* Look up the space. */
+	struct space *space = space_cache_find(space_id);
+	if (space == NULL)
+		diag_raise();
+	/*
+	 * All secondary indexes could have been dropped, in
+	 * which case we don't need to generate deferred DELETE
+	 * statements anymore.
+	 */
+	if (space->index_count <= 1)
+		return;
+	/*
+	 * Wait for memory quota if necessary before starting to
+	 * process the batch (we can't yield between statements).
+	 */
+	struct vy_env *env = vy_env(space->engine);
+	if (is_first_statement)
+		vy_quota_wait(&env->quota);
+
+	/* Create the deferred DELETE statement. */
+	struct vy_lsm *pk = vy_lsm(space->index[0]);
+	struct tuple *delete = vy_stmt_new_surrogate_delete_raw(pk->mem_format,
+						delete_data, delete_data_end);
+	if (delete == NULL)
+		diag_raise();
+	/*
+	 * A deferred DELETE may be generated after new statements
+	 * were committed for the deleted key. So we must use the
+	 * original LSN (not the one of the WAL row) when inserting
+	 * a deferred DELETE into an index to make sure that it will
+	 * purge the appropriate tuple on compaction. However, this
+	 * also breaks the read iterator invariant that states that
+	 * newer sources contain newer statements for the same key.
+	 * So we mark deferred DELETEs with the VY_STMT_SKIP_READ
+	 * flag, which makes the read iterator ignore them.
+	 */
+	vy_stmt_set_lsn(delete, lsn);
+	vy_stmt_set_flags(delete, VY_STMT_SKIP_READ);
+
+	/* Insert the deferred DELETE into secondary indexes. */
+	int rc = 0;
+	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
+	const struct tuple *region_stmt = NULL;
+	for (uint32_t i = 1; i < space->index_count; i++) {
+		struct vy_lsm *lsm = vy_lsm(space->index[i]);
+		if (vy_is_committed_one(env, lsm))
+			continue;
+		/*
+		 * As usual, rotate the active in-memory index if
+		 * schema was changed or dump was triggered. Do it
+		 * only if processing the first statement, because
+		 * dump may be triggered by one of the statements
+		 * of this transaction (see vy_quota_force_use()
+		 * below), in which case we must not do rotation
+		 * as we want all statements to land in the same
+		 * in-memory index. This is safe, as long as we
+		 * don't yield between statements.
+		 */
+		struct vy_mem *mem = lsm->mem;
+		if (is_first_statement &&
+		    (mem->space_cache_version != space_cache_version ||
+		     mem->generation != *lsm->env->p_generation)) {
+			rc = vy_lsm_rotate_mem(lsm);
+			if (rc != 0)
+				break;
+			mem = lsm->mem;
+		}
+		rc = vy_lsm_set(lsm, mem, delete, &region_stmt);
+		if (rc != 0)
+			break;
+		vy_lsm_commit_stmt(lsm, mem, region_stmt);
+
+		if (!is_first_statement)
+			continue;
+		/*
+		 * If this is the first statement of this
+		 * transaction, install on_commit trigger
+		 * which will propagate the WAL row LSN to
+		 * the LSM tree.
+		 */
+		struct trigger *on_commit = region_alloc(&fiber()->gc,
+							 sizeof(*on_commit));
+		if (on_commit == NULL) {
+			diag_set(OutOfMemory, sizeof(*on_commit),
+				 "region", "struct trigger");
+			rc = -1;
+			break;
+		}
+		struct trigger *on_rollback = region_alloc(&fiber()->gc,
+							   sizeof(*on_commit));
+		if (on_rollback == NULL) {
+			diag_set(OutOfMemory, sizeof(*on_commit),
+				 "region", "struct trigger");
+			rc = -1;
+			break;
+		}
+		vy_mem_pin(mem);
+		trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL);
+		trigger_create(on_rollback, vy_deferred_delete_on_rollback, mem, NULL);
+		txn_on_commit(txn, on_commit);
+		txn_on_rollback(txn, on_rollback);
+	}
+	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
+	assert(mem_used_after >= mem_used_before);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+
+	tuple_unref(delete);
+	if (rc != 0)
+		diag_raise();
 }
 
 static struct trigger on_replace_vinyl_deferred_delete = {
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index f0b7ec9c020c55afedb69b931cb44da8259e4866..d2aa0c43b12321764ba946de19e571dff5082148 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -50,6 +50,7 @@ extern "C" {
 #endif /* defined(__cplusplus) */
 
 struct histogram;
+struct index;
 struct tuple;
 struct tuple_format;
 struct vy_lsm;
@@ -292,6 +293,10 @@ struct vy_lsm {
 	vy_lsm_read_set_t read_set;
 };
 
+/** Extract vy_lsm from an index object. */
+struct vy_lsm *
+vy_lsm(struct index *index);
+
 /** Return LSM tree name. Used for logging. */
 const char *
 vy_lsm_name(struct vy_lsm *lsm);
diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h
index 957f549f7c0133932204e39bce03f75b18f6068c..29b60ac7f7468d464ea2f9ade1040c8dd2eaca6f 100644
--- a/src/box/vy_mem.h
+++ b/src/box/vy_mem.h
@@ -171,6 +171,12 @@ struct vy_mem {
 	 *
 	 * Once the tree is dumped to disk it will be used to update
 	 * vy_lsm::dump_lsn, see vy_task_dump_new().
+	 *
+	 * Note, we account not only original LSN (vy_stmt_lsn())
+	 * in this variable, but also WAL LSN of deferred DELETE
+	 * statements. This is needed to skip WAL recovery of both
+	 * deferred and normal statements that have been dumped to
+	 * disk. See vy_deferred_delete_on_replace() for more details.
 	 */
 	int64_t dump_lsn;
 	/**
diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c
index 5e43340b90fa76d154e610bde9545c29764a9bec..7b704b849015c01c5df7f5f769f22f09b20ee1b9 100644
--- a/src/box/vy_point_lookup.c
+++ b/src/box/vy_point_lookup.c
@@ -293,3 +293,35 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx,
 	}
 	return 0;
 }
+
+int
+vy_point_lookup_mem(struct vy_lsm *lsm, const struct vy_read_view **rv,
+		    struct tuple *key, struct tuple **ret)
+{
+	assert(tuple_field_count(key) >= lsm->cmp_def->part_count);
+
+	int rc;
+	struct vy_history history;
+	vy_history_create(&history, &lsm->env->history_node_pool);
+
+	rc = vy_point_lookup_scan_cache(lsm, rv, key, &history);
+	if (rc != 0 || vy_history_is_terminal(&history))
+		goto done;
+
+	rc = vy_point_lookup_scan_mems(lsm, rv, key, &history);
+	if (rc != 0 || vy_history_is_terminal(&history))
+		goto done;
+
+	*ret = NULL;
+	goto out;
+done:
+	if (rc == 0) {
+		int upserts_applied;
+		rc = vy_history_apply(&history, lsm->cmp_def, lsm->mem_format,
+				      true, &upserts_applied, ret);
+		lsm->stat.upsert.applied += upserts_applied;
+	}
+out:
+	vy_history_cleanup(&history);
+	return rc;
+}
diff --git a/src/box/vy_point_lookup.h b/src/box/vy_point_lookup.h
index 3b7c5a04cd8ce5fa2838bec238205ccefa274bf6..6d77ce9c16502c2730eb76983149b61b5f835dbc 100644
--- a/src/box/vy_point_lookup.h
+++ b/src/box/vy_point_lookup.h
@@ -71,6 +71,24 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx,
 		const struct vy_read_view **rv,
 		struct tuple *key, struct tuple **ret);
 
+/**
+ * Look up a tuple by key in memory.
+ *
+ * This function works just like vy_point_lookup() except:
+ *
+ * - It only scans in-memory level and cache and hence doesn't yield.
+ * - It doesn't turn DELETE into NULL so it returns NULL if and only
+ *   if no terminal statement matching the key is present in memory
+ *   (there still may be statements stored on disk though).
+ * - It doesn't account the lookup to LSM tree stats (as it never
+ *   descends to lower levels).
+ *
+ * The function returns 0 on success, -1 on memory allocation error.
+ */
+int
+vy_point_lookup_mem(struct vy_lsm *lsm, const struct vy_read_view **rv,
+		    struct tuple *key, struct tuple **ret);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 7d8961c4a029488df0bad3bea364e0a7e08646ac..64975f3ad7576a2cd34b2b8804c3697417bde49a 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -49,6 +49,10 @@
 #include "cbus.h"
 #include "salad/stailq.h"
 #include "say.h"
+#include "txn.h"
+#include "space.h"
+#include "schema.h"
+#include "xrow.h"
 #include "vy_lsm.h"
 #include "vy_log.h"
 #include "vy_mem.h"
@@ -65,6 +69,8 @@ static int vy_worker_f(va_list);
 static int vy_scheduler_f(va_list);
 static void vy_task_execute_f(struct cmsg *);
 static void vy_task_complete_f(struct cmsg *);
+static void vy_deferred_delete_batch_process_f(struct cmsg *);
+static void vy_deferred_delete_batch_free_f(struct cmsg *);
 
 static const struct cmsg_hop vy_task_execute_route[] = {
 	{ vy_task_execute_f, NULL },
@@ -83,10 +89,42 @@ struct vy_worker {
 	struct cpipe tx_pipe;
 	/** Link in vy_scheduler::idle_workers. */
 	struct stailq_entry in_idle;
+	/** Route for sending deferred DELETEs back to tx. */
+	struct cmsg_hop deferred_delete_route[2];
 };
 
 struct vy_task;
 
+/** Max number of statements in a batch of deferred DELETEs. */
+enum { VY_DEFERRED_DELETE_BATCH_MAX = 100 };
+
+/** Deferred DELETE statement. */
+struct vy_deferred_delete_stmt {
+	/** Overwritten tuple. */
+	struct tuple *old_stmt;
+	/** Statement that overwrote @old_stmt. */
+	struct tuple *new_stmt;
+};
+
+/**
+ * Batch of deferred DELETE statements generated during
+ * a primary index compaction.
+ */
+struct vy_deferred_delete_batch {
+	/** CBus messages for sending the batch to tx. */
+	struct cmsg cmsg;
+	/** Task that generated this batch. */
+	struct vy_task *task;
+	/** Set if the tx thread failed to process the batch. */
+	bool is_failed;
+	/** In case of failure the error is stored here. */
+	struct diag diag;
+	/** Number of elements actually stored in @stmt array. */
+	int count;
+	/** Array of deferred DELETE statements. */
+	struct vy_deferred_delete_stmt stmt[VY_DEFERRED_DELETE_BATCH_MAX];
+};
+
 struct vy_task_ops {
 	/**
 	 * This function is called from a worker. It is supposed to do work
@@ -159,10 +197,26 @@ struct vy_task {
 	 */
 	double bloom_fpr;
 	int64_t page_size;
+	/**
+	 * Deferred DELETE handler passed to the write iterator.
+	 * It sends deferred DELETE statements generated during
+	 * primary index compaction back to tx.
+	 */
+	struct vy_deferred_delete_handler deferred_delete_handler;
+	/** Batch of deferred deletes generated by this task. */
+	struct vy_deferred_delete_batch *deferred_delete_batch;
+	/**
+	 * Number of batches of deferred DELETEs sent to tx
+	 * and not yet processed.
+	 */
+	int deferred_delete_in_progress;
 	/** Link in vy_scheduler::processed_tasks. */
 	struct stailq_entry in_processed;
 };
 
+static const struct vy_deferred_delete_handler_iface
+vy_task_deferred_delete_iface;
+
 /**
  * Allocate a new task to be executed by a worker thread.
  * When preparing an asynchronous task, this function must
@@ -197,6 +251,7 @@ vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 	}
 	vy_lsm_ref(lsm);
 	diag_create(&task->diag);
+	task->deferred_delete_handler.iface = &vy_task_deferred_delete_iface;
 	return task;
 }
 
@@ -204,6 +259,8 @@ vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 static void
 vy_task_delete(struct vy_task *task)
 {
+	assert(task->deferred_delete_batch == NULL);
+	assert(task->deferred_delete_in_progress == 0);
 	key_def_delete(task->cmp_def);
 	key_def_delete(task->key_def);
 	vy_lsm_unref(task->lsm);
@@ -293,6 +350,12 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
 		cpipe_create(&worker->worker_pipe, name);
 		stailq_add_tail_entry(&scheduler->idle_workers,
 				      worker, in_idle);
+
+		struct cmsg_hop *route = worker->deferred_delete_route;
+		route[0].f = vy_deferred_delete_batch_process_f;
+		route[0].pipe = &worker->worker_pipe;
+		route[1].f = vy_deferred_delete_batch_free_f;
+		route[1].pipe = NULL;
 	}
 }
 
@@ -652,6 +715,237 @@ vy_run_discard(struct vy_run *run)
 	vy_log_tx_try_commit();
 }
 
+/**
+ * Encode and write a single deferred DELETE statement to
+ * _vinyl_deferred_delete system space. The rest will be
+ * done by the space trigger.
+ */
+static int
+vy_deferred_delete_process_one(struct space *deferred_delete_space,
+			       uint32_t space_id, struct tuple_format *format,
+			       struct vy_deferred_delete_stmt *stmt)
+{
+	int64_t lsn = vy_stmt_lsn(stmt->new_stmt);
+
+	struct tuple *delete;
+	delete = vy_stmt_new_surrogate_delete(format, stmt->old_stmt);
+	if (delete == NULL)
+		return -1;
+
+	uint32_t delete_data_size;
+	const char *delete_data = tuple_data_range(delete, &delete_data_size);
+
+	size_t buf_size = (mp_sizeof_array(3) + mp_sizeof_uint(space_id) +
+			   mp_sizeof_uint(lsn) + delete_data_size);
+	char *data = region_alloc(&fiber()->gc, buf_size);
+	if (data == NULL) {
+		diag_set(OutOfMemory, buf_size, "region", "buf");
+		tuple_unref(delete);
+		return -1;
+	}
+
+	char *data_end = data;
+	data_end = mp_encode_array(data_end, 3);
+	data_end = mp_encode_uint(data_end, space_id);
+	data_end = mp_encode_uint(data_end, lsn);
+	memcpy(data_end, delete_data, delete_data_size);
+	data_end += delete_data_size;
+	assert(data_end <= data + buf_size);
+
+	struct request request;
+	memset(&request, 0, sizeof(request));
+	request.type = IPROTO_REPLACE;
+	request.space_id = BOX_VINYL_DEFERRED_DELETE_ID;
+	request.tuple = data;
+	request.tuple_end = data_end;
+
+	tuple_unref(delete);
+
+	struct txn *txn = txn_begin_stmt(deferred_delete_space);
+	if (txn == NULL)
+		return -1;
+
+	struct tuple *unused;
+	if (space_execute_dml(deferred_delete_space, txn,
+			      &request, &unused) != 0) {
+		txn_rollback_stmt();
+		return -1;
+	}
+	return txn_commit_stmt(txn, &request);
+}
+
+/**
+ * Callback invoked by the tx thread to process deferred DELETE
+ * statements generated during compaction. It writes deferred
+ * DELETEs to a special system space, _vinyl_deferred_delete.
+ * The system space has an on_replace trigger installed which
+ * propagates the DELETEs to secondary indexes. This way, even
+ * if a deferred DELETE isn't dumped to disk by vinyl, it still
+ * can be recovered from WAL.
+ */
+static void
+vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
+{
+	struct vy_deferred_delete_batch *batch = container_of(cmsg,
+				struct vy_deferred_delete_batch, cmsg);
+	struct vy_task *task = batch->task;
+	struct vy_lsm *pk = task->lsm;
+
+	assert(pk->index_id == 0);
+	/*
+	 * A space can be dropped while a compaction task
+	 * is in progress.
+	 */
+	if (pk->is_dropped)
+		return;
+
+	struct space *deferred_delete_space;
+	deferred_delete_space = space_by_id(BOX_VINYL_DEFERRED_DELETE_ID);
+	assert(deferred_delete_space != NULL);
+
+	struct txn *txn = txn_begin(false);
+	if (txn == NULL)
+		goto fail;
+
+	for (int i = 0; i < batch->count; i++) {
+		if (vy_deferred_delete_process_one(deferred_delete_space,
+						   pk->space_id, pk->mem_format,
+						   &batch->stmt[i]) != 0)
+			goto fail;
+	}
+
+	if (txn_commit(txn) != 0)
+		goto fail;
+
+	return;
+fail:
+	batch->is_failed = true;
+	diag_move(diag_get(), &batch->diag);
+	txn_rollback();
+}
+
+/**
+ * Callback invoked by a worker thread to free processed deferred
+ * DELETE statements. It must be done on behalf the worker thread
+ * that generated those DELETEs, because a vinyl statement cannot
+ * be allocated and freed in different threads.
+ */
+static void
+vy_deferred_delete_batch_free_f(struct cmsg *cmsg)
+{
+	struct vy_deferred_delete_batch *batch = container_of(cmsg,
+				struct vy_deferred_delete_batch, cmsg);
+	struct vy_task *task = batch->task;
+	for (int i = 0; i < batch->count; i++) {
+		struct vy_deferred_delete_stmt *stmt = &batch->stmt[i];
+		vy_stmt_unref_if_possible(stmt->old_stmt);
+		vy_stmt_unref_if_possible(stmt->new_stmt);
+	}
+	/*
+	 * Abort the task if the tx thread failed to process
+	 * the batch unless it has already been aborted.
+	 */
+	if (batch->is_failed && !task->is_failed) {
+		assert(!diag_is_empty(&batch->diag));
+		diag_move(&batch->diag, &task->diag);
+		task->is_failed = true;
+		fiber_cancel(task->fiber);
+	}
+	diag_destroy(&batch->diag);
+	free(batch);
+	/* Notify the caller if this is the last batch. */
+	assert(task->deferred_delete_in_progress > 0);
+	if (--task->deferred_delete_in_progress == 0)
+		fiber_wakeup(task->fiber);
+}
+
+/**
+ * Send all deferred DELETEs accumulated by a vinyl task to
+ * the tx thread where they will be processed.
+ */
+static void
+vy_task_deferred_delete_flush(struct vy_task *task)
+{
+	struct vy_worker *worker = task->worker;
+	struct vy_deferred_delete_batch *batch = task->deferred_delete_batch;
+
+	if (batch == NULL)
+		return;
+
+	task->deferred_delete_batch = NULL;
+	task->deferred_delete_in_progress++;
+
+	cmsg_init(&batch->cmsg, worker->deferred_delete_route);
+	cpipe_push(&worker->tx_pipe, &batch->cmsg);
+}
+
+/**
+ * Add a deferred DELETE to a batch. Once the batch gets full,
+ * submit it to tx where it will get processed.
+ */
+static int
+vy_task_deferred_delete_process(struct vy_deferred_delete_handler *handler,
+				struct tuple *old_stmt, struct tuple *new_stmt)
+{
+	enum { MAX_IN_PROGRESS = 10 };
+
+	struct vy_task *task = container_of(handler, struct vy_task,
+					    deferred_delete_handler);
+	struct vy_deferred_delete_batch *batch = task->deferred_delete_batch;
+
+	/*
+	 * Throttle compaction task if there are too many batches
+	 * being processed so as to limit memory consumption.
+	 */
+	while (task->deferred_delete_in_progress >= MAX_IN_PROGRESS)
+		fiber_sleep(TIMEOUT_INFINITY);
+
+	/* Allocate a new batch on demand. */
+	if (batch == NULL) {
+		batch = malloc(sizeof(*batch));
+		if (batch == NULL) {
+			diag_set(OutOfMemory, sizeof(*batch), "malloc",
+				 "struct vy_deferred_delete_batch");
+			return -1;
+		}
+		memset(batch, 0, sizeof(*batch));
+		batch->task = task;
+		diag_create(&batch->diag);
+		task->deferred_delete_batch = batch;
+	}
+
+	assert(batch->count < VY_DEFERRED_DELETE_BATCH_MAX);
+	struct vy_deferred_delete_stmt *stmt = &batch->stmt[batch->count++];
+	stmt->old_stmt = old_stmt;
+	vy_stmt_ref_if_possible(old_stmt);
+	stmt->new_stmt = new_stmt;
+	vy_stmt_ref_if_possible(new_stmt);
+
+	if (batch->count == VY_DEFERRED_DELETE_BATCH_MAX)
+		vy_task_deferred_delete_flush(task);
+	return 0;
+}
+
+/**
+ * Wait until all pending deferred DELETE statements have been
+ * processed by tx. Called when the write iterator stops.
+ */
+static void
+vy_task_deferred_delete_destroy(struct vy_deferred_delete_handler *handler)
+{
+	struct vy_task *task = container_of(handler, struct vy_task,
+					    deferred_delete_handler);
+	vy_task_deferred_delete_flush(task);
+	while (task->deferred_delete_in_progress > 0)
+		fiber_sleep(TIMEOUT_INFINITY);
+}
+
+static const struct vy_deferred_delete_handler_iface
+vy_task_deferred_delete_iface = {
+	.process = vy_task_deferred_delete_process,
+	.destroy = vy_task_deferred_delete_destroy,
+};
+
 static int
 vy_task_write_run(struct vy_task *task)
 {
@@ -1002,6 +1296,12 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 
 	new_run->dump_lsn = dump_lsn;
 
+	/*
+	 * Note, since deferred DELETE are generated on tx commit
+	 * in case the overwritten tuple is found in-memory, no
+	 * deferred DELETE statement should be generated during
+	 * dump so we don't pass a deferred DELETE handler.
+	 */
 	struct vy_stmt_stream *wi;
 	bool is_last_level = (lsm->run_count == 0);
 	wi = vy_write_iterator_new(task->cmp_def, lsm->disk_format,
@@ -1273,7 +1573,9 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 	bool is_last_level = (range->compact_priority == range->slice_count);
 	wi = vy_write_iterator_new(task->cmp_def, lsm->disk_format,
 				   lsm->index_id == 0, is_last_level,
-				   scheduler->read_views, NULL);
+				   scheduler->read_views,
+				   lsm->index_id > 0 ? NULL :
+				   &task->deferred_delete_handler);
 	if (wi == NULL)
 		goto err_wi;
 
@@ -1336,7 +1638,7 @@ static int
 vy_task_f(va_list va)
 {
 	struct vy_task *task = va_arg(va, struct vy_task *);
-	if (task->ops->execute(task) != 0) {
+	if (task->ops->execute(task) != 0 && !task->is_failed) {
 		struct diag *diag = diag_get();
 		assert(!diag_is_empty(diag));
 		task->is_failed = true;
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index c7a6d9ff971c6bfee553ca9b7bb0bdeb8ef13faf..590b4483297184ae98af92b151175fad9bdd1899 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -46,6 +46,7 @@
 #include "iterator_type.h"
 #include "salad/stailq.h"
 #include "schema.h" /* space_cache_version */
+#include "space.h"
 #include "trigger.h"
 #include "trivia/util.h"
 #include "tuple.h"
@@ -58,6 +59,7 @@
 #include "vy_history.h"
 #include "vy_read_set.h"
 #include "vy_read_view.h"
+#include "vy_point_lookup.h"
 
 int
 write_set_cmp(struct txv *a, struct txv *b)
@@ -468,6 +470,112 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem,
 	return vy_lsm_set(lsm, mem, stmt, region_stmt);
 }
 
+/**
+ * Try to generate a deferred DELETE statement on tx commit.
+ *
+ * This function is supposed to be called for a primary index
+ * statement which was executed without deletion of the overwritten
+ * tuple from secondary indexes. It looks up the overwritten tuple
+ * in memory and, if found, produces the deferred DELETEs and
+ * inserts them into the transaction log.
+ *
+ * Generating DELETEs before committing a transaction rather than
+ * postponing it to dump isn't just an optimization. The point is
+ * that we can't generate deferred DELETEs during dump, because
+ * if we run out of memory, we won't be able to schedule another
+ * dump to free some.
+ *
+ * Affects @tx->log, @v->stmt.
+ *
+ * Returns 0 on success, -1 on memory allocation error.
+ */
+static int
+vy_tx_handle_deferred_delete(struct vy_tx *tx, struct txv *v)
+{
+	struct vy_lsm *pk = v->lsm;
+	struct tuple *stmt = v->stmt;
+	uint8_t flags = vy_stmt_flags(stmt);
+
+	assert(pk->index_id == 0);
+	assert(flags & VY_STMT_DEFERRED_DELETE);
+
+	struct space *space = space_cache_find(pk->space_id);
+	if (space == NULL) {
+		/*
+		 * Space was dropped while transaction was
+		 * in progress. Nothing to do.
+		 */
+		return 0;
+	}
+
+	/* Look up the tuple overwritten by this statement. */
+	struct tuple *tuple;
+	if (vy_point_lookup_mem(pk, &tx->xm->p_global_read_view,
+				stmt, &tuple) != 0)
+		return -1;
+
+	if (tuple == NULL) {
+		/*
+		 * Nothing's found, but there still may be
+		 * matching statements stored on disk so we
+		 * have to defer generation of DELETE until
+		 * compaction.
+		 */
+		return 0;
+	}
+
+	/*
+	 * If a terminal statement is found, we can produce
+	 * DELETE right away so clear the flag now.
+	 */
+	vy_stmt_set_flags(stmt, flags & ~VY_STMT_DEFERRED_DELETE);
+
+	if (vy_stmt_type(tuple) == IPROTO_DELETE) {
+		/* The tuple's already deleted, nothing to do. */
+		tuple_unref(tuple);
+		return 0;
+	}
+
+	struct tuple *delete_stmt;
+	delete_stmt = vy_stmt_new_surrogate_delete(pk->mem_format, tuple);
+	tuple_unref(tuple);
+	if (delete_stmt == NULL)
+		return -1;
+
+	if (vy_stmt_type(stmt) == IPROTO_DELETE) {
+		/*
+		 * Since primary and secondary indexes of the
+		 * same space share in-memory statements, we
+		 * need to use the new DELETE in the primary
+		 * index, because the original DELETE doesn't
+		 * contain secondary key parts.
+		 */
+		vy_stmt_counter_acct_tuple(&pk->stat.txw.count, delete_stmt);
+		vy_stmt_counter_unacct_tuple(&pk->stat.txw.count, stmt);
+		v->stmt = delete_stmt;
+		tuple_ref(delete_stmt);
+		tuple_unref(stmt);
+	}
+
+	/*
+	 * Make DELETE statements for secondary indexes and
+	 * insert them into the transaction log.
+	 */
+	int rc = 0;
+	for (uint32_t i = 1; i < space->index_count; i++) {
+		struct vy_lsm *lsm = vy_lsm(space->index[i]);
+		struct txv *delete_txv = txv_new(tx, lsm, delete_stmt);
+		if (delete_txv == NULL) {
+			rc = -1;
+			break;
+		}
+		stailq_insert_entry(&tx->log, delete_txv, v, next_in_log);
+		vy_stmt_counter_acct_tuple(&lsm->stat.txw.count, delete_stmt);
+	}
+	tuple_unref(delete_stmt);
+	return rc;
+}
+
 int
 vy_tx_prepare(struct vy_tx *tx)
 {
@@ -521,6 +629,22 @@ vy_tx_prepare(struct vy_tx *tx)
 		if (v->is_overwritten)
 			continue;
 
+		if (lsm->index_id > 0 && repsert == NULL && delete == NULL) {
+			/*
+			 * This statement is for a secondary index,
+			 * and the statement corresponding to it in
+			 * the primary index was overwritten. This
+			 * can only happen if insertion of DELETE
+			 * into secondary indexes was postponed until
+			 * primary index compaction. In this case
+			 * the DELETE will not be generated, because
+			 * the corresponding statement never made it
+			 * to the primary index LSM tree. So we must
+			 * skip it for secondary indexes as well.
+			 */
+			continue;
+		}
+
 		enum iproto_type type = vy_stmt_type(v->stmt);
 
 		/* Optimize out INSERT + DELETE for the same key. */
@@ -535,6 +659,16 @@ vy_tx_prepare(struct vy_tx *tx)
 			 */
 			type = IPROTO_INSERT;
 			vy_stmt_set_type(v->stmt, type);
+			/*
+			 * In case of INSERT, no statement was actually
+			 * overwritten so no need to generate a deferred
+			 * DELETE for secondary indexes.
+			 */
+			uint8_t flags = vy_stmt_flags(v->stmt);
+			if (flags & VY_STMT_DEFERRED_DELETE) {
+				vy_stmt_set_flags(v->stmt, flags &
+						  ~VY_STMT_DEFERRED_DELETE);
+			}
 		}
 
 		if (!v->is_first_insert && type == IPROTO_INSERT) {
@@ -550,6 +684,11 @@ vy_tx_prepare(struct vy_tx *tx)
 			return -1;
 		assert(v->mem != NULL);
 
+		if (lsm->index_id == 0 &&
+		    vy_stmt_flags(v->stmt) & VY_STMT_DEFERRED_DELETE &&
+		    vy_tx_handle_deferred_delete(tx, v) != 0)
+			return -1;
+
 		/* In secondary indexes only REPLACE/DELETE can be written. */
 		vy_stmt_set_lsn(v->stmt, MAX_LSN + tx->psn);
 		const struct tuple **region_stmt =
diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c
index 87f26900fb2b31b195b84619ce8a91a74698d414..eee25274f46c43e0c298e22cdb6e318ea4fd84fd 100644
--- a/test/unit/vy_point_lookup.c
+++ b/test/unit/vy_point_lookup.c
@@ -13,6 +13,8 @@
 
 uint32_t schema_version;
 uint32_t space_cache_version;
+struct space *space_by_id(uint32_t id) { return NULL; }
+struct vy_lsm *vy_lsm(struct index *index) { return NULL; }
 
 static int
 write_run(struct vy_run *run, const char *dir_name,
diff --git a/test/vinyl/deferred_delete.result b/test/vinyl/deferred_delete.result
new file mode 100644
index 0000000000000000000000000000000000000000..9811b6bcc8b11a956a243b41c14c37323481eeca
--- /dev/null
+++ b/test/vinyl/deferred_delete.result
@@ -0,0 +1,677 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+--
+-- Create a space with secondary indexes and check that REPLACE and
+-- DELETE requests do not look up the old tuple in the primary index
+-- to generate the DELETE statements for secondary indexes. Instead
+-- DELETEs are generated when the primary index is compacted (gh-2129).
+-- The optimization should work for both non-unique and unique indexes
+-- so mark one of the indexes unique.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
+---
+...
+i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true})
+---
+...
+for i = 1, 10 do s:replace{i, i, i} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+for i = 1, 10, 2 do s:delete{i} end
+---
+...
+for i = 2, 10, 2 do s:replace{i, i * 10, i * 100} end
+---
+...
+-- DELETE/REPLACE does not look up the old tuple in the primary index.
+pk:stat().lookup -- 0
+---
+- 0
+...
+-- DELETEs are not written to secondary indexes.
+pk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 5 DELETEs
+---
+- 20
+...
+i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs
+---
+- 15
+...
+i2:stat().rows -- ditto
+---
+- 15
+...
+-- Although there are only 5 tuples in the space, we have to look up
+-- overwritten tuples in the primary index hence 15 lookups per SELECT
+-- in a secondary index.
+i1:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i1:stat().get.rows -- 15
+---
+- 15
+...
+pk:stat().lookup -- 15
+---
+- 15
+...
+i2:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i2:stat().get.rows -- 15
+---
+- 15
+...
+pk:stat().lookup -- 30
+---
+- 30
+...
+-- Overwritten/deleted tuples are not stored in the cache so calling
+-- SELECT for a second time does only 5 lookups.
+box.stat.reset()
+---
+...
+i1:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i1:stat().get.rows -- 5
+---
+- 5
+...
+pk:stat().lookup -- 5
+---
+- 5
+...
+i2:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i2:stat().get.rows -- 5
+---
+- 5
+...
+pk:stat().lookup -- 10
+---
+- 10
+...
+-- Cleanup the cache.
+vinyl_cache = box.cfg.vinyl_cache
+---
+...
+box.cfg{vinyl_cache = 0}
+---
+...
+box.cfg{vinyl_cache = vinyl_cache}
+---
+...
+-- Compact the primary index to generate deferred DELETEs.
+box.snapshot()
+---
+- ok
+...
+pk:compact()
+---
+...
+while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+pk:stat().rows -- 5 new REPLACEs
+---
+- 5
+...
+i1:stat().rows -- 10 old REPLACE + 5 new REPLACEs + 10 deferred DELETEs
+---
+- 25
+...
+i2:stat().rows -- ditto
+---
+- 25
+...
+-- Deferred DELETEs must be ignored by the read iterator, because
+-- they may break the read iterator invariant, so they don't reduce
+-- the number of lookups.
+box.stat.reset()
+---
+...
+i1:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i1:stat().get.rows -- 15
+---
+- 15
+...
+pk:stat().lookup -- 15
+---
+- 15
+...
+i2:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i2:stat().get.rows -- 15
+---
+- 15
+...
+pk:stat().lookup -- 30
+---
+- 30
+...
+-- Check that deferred DELETEs are not lost after restart.
+test_run:cmd("restart server default")
+fiber = require('fiber')
+---
+...
+s = box.space.test
+---
+...
+pk = s.index.pk
+---
+...
+i1 = s.index.i1
+---
+...
+i2 = s.index.i2
+---
+...
+i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 10 deferred DELETEs
+---
+- 25
+...
+i2:stat().rows -- ditto
+---
+- 25
+...
+-- Dump deferred DELETEs to disk and compact them.
+-- Check that they cleanup garbage statements.
+box.snapshot()
+---
+- ok
+...
+i1:compact()
+---
+...
+while i1:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+i2:compact()
+---
+...
+while i2:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+i1:stat().rows -- 5 new REPLACEs
+---
+- 5
+...
+i2:stat().rows -- ditto
+---
+- 5
+...
+box.stat.reset()
+---
+...
+i1:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i1:stat().get.rows -- 5
+---
+- 5
+...
+pk:stat().lookup -- 5
+---
+- 5
+...
+i2:select()
+---
+- - [2, 20, 200]
+  - [4, 40, 400]
+  - [6, 60, 600]
+  - [8, 80, 800]
+  - [10, 100, 1000]
+...
+i2:stat().get.rows -- 5
+---
+- 5
+...
+pk:stat().lookup -- 10
+---
+- 10
+...
+s:drop()
+---
+...
+--
+-- Check that if the old tuple is found in cache or in memory, then
+-- the DELETE for secondary indexes is generated when the statement
+-- is committed.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
+---
+...
+for i = 1, 10 do s:replace{i, i} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:count() -- add tuples to the cache
+---
+- 10
+...
+box.stat.reset()
+---
+...
+for i = 1, 10, 2 do s:delete{i} end
+---
+...
+for i = 2, 10, 2 do s:replace{i, i * 10} end
+---
+...
+pk:stat().lookup -- 0
+---
+- 0
+...
+pk:stat().cache.lookup -- 10
+---
+- 10
+...
+pk:stat().cache.get.rows -- 10
+---
+- 10
+...
+pk:stat().memory.iterator.lookup -- 0
+---
+- 0
+...
+sk:stat().rows -- 10 old REPLACEs + 10 DELETEs + 5 new REPLACEs
+---
+- 25
+...
+box.stat.reset()
+---
+...
+for i = 1, 10 do s:replace{i, i * 100} end
+---
+...
+pk:stat().lookup -- 0
+---
+- 0
+...
+pk:stat().cache.lookup -- 10
+---
+- 10
+...
+pk:stat().cache.get.rows -- 0
+---
+- 0
+...
+pk:stat().memory.iterator.lookup -- 10
+---
+- 10
+...
+pk:stat().memory.iterator.get.rows -- 10
+---
+- 10
+...
+sk:stat().rows -- 15 old REPLACEs + 15 DELETEs + 10 new REPLACEs
+---
+- 40
+...
+box.stat.reset()
+---
+...
+for i = 1, 10 do s:delete{i} end
+---
+...
+pk:stat().lookup -- 0
+---
+- 0
+...
+pk:stat().cache.lookup -- 10
+---
+- 10
+...
+pk:stat().cache.get.rows -- 0
+---
+- 0
+...
+pk:stat().memory.iterator.lookup -- 10
+---
+- 10
+...
+pk:stat().memory.iterator.get.rows -- 10
+---
+- 10
+...
+sk:stat().rows -- 25 old REPLACEs + 25 DELETEs
+---
+- 50
+...
+sk:select()
+---
+- []
+...
+pk:stat().lookup -- 0
+---
+- 0
+...
+box.snapshot()
+---
+- ok
+...
+sk:compact()
+---
+...
+while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+sk:stat().run_count -- 0
+---
+- 0
+...
+s:drop()
+---
+...
+--
+-- Check that a transaction is aborted if it read a tuple from
+-- a secondary index that was overwritten in the primary index.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk = s:create_index('sk', {parts = {2, 'unsigned'}, unique = false})
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+box.begin()
+---
+...
+sk:select{1}
+---
+- - [1, 1]
+...
+c = fiber.channel(1)
+---
+...
+_ = fiber.create(function() s:replace{1, 10} c:put(true) end)
+---
+...
+c:get()
+---
+- true
+...
+sk:select{1}
+---
+- - [1, 1]
+...
+s:replace{10, 10}
+---
+- [10, 10]
+...
+box.commit() -- error
+---
+- error: Transaction has been aborted by conflict
+...
+s:drop()
+---
+...
+--
+-- Check that if a tuple was overwritten in the transaction write set,
+-- it won't be committed to secondary indexes.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
+---
+...
+for i = 1, 10 do s:replace{i, i} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+box.begin()
+---
+...
+for i = 1, 10 do s:replace{i, i * 10} end
+---
+...
+for i = 1, 10, 2 do s:delete{i} end
+---
+...
+for i = 2, 10, 2 do s:replace{i, i * 100} end
+---
+...
+box.commit()
+---
+...
+sk:select()
+---
+- - [2, 200]
+  - [4, 400]
+  - [6, 600]
+  - [8, 800]
+  - [10, 1000]
+...
+pk:stat().rows -- 10 old REPLACEs + 5 DELETEs + 5 new REPLACEs
+---
+- 20
+...
+sk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs
+---
+- 15
+...
+-- Compact the primary index to generate deferred DELETEs.
+box.snapshot()
+---
+- ok
+...
+pk:compact()
+---
+...
+while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+-- Compact the secondary index to cleanup garbage.
+box.snapshot()
+---
+- ok
+...
+sk:compact()
+---
+...
+while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+sk:select()
+---
+- - [2, 200]
+  - [4, 400]
+  - [6, 600]
+  - [8, 800]
+  - [10, 1000]
+...
+pk:stat().rows -- 5 new REPLACEs
+---
+- 5
+...
+sk:stat().rows -- ditto
+---
+- 5
+...
+s:drop()
+---
+...
+--
+-- Check that on recovery we do not apply deferred DELETEs that
+-- have been dumped to disk.
+--
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+---
+- true
+...
+test_run:cmd("start server test with args='1048576'")
+---
+- true
+...
+test_run:cmd("switch test")
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned', 3, 'string'}, unique = false})
+---
+...
+pad = string.rep('x', 10 * 1024)
+---
+...
+for i = 1, 120 do s:replace{i, i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+pad = string.rep('y', 10 * 1024)
+---
+...
+for i = 1, 120 do s:replace{i, i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs
+---
+- 240
+...
+box.stat.reset()
+---
+...
+-- Compact the primary index to generate deferred DELETEs.
+-- Deferred DELETEs won't fit in memory and trigger dump
+-- of the secondary index.
+pk:compact()
+---
+...
+while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+sk:stat().disk.dump.count -- 1
+---
+- 1
+...
+sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs + 120 deferred DELETEs
+---
+- 360
+...
+test_run:cmd("restart server test with args='1048576'")
+s = box.space.test
+---
+...
+pk = s.index.pk
+---
+...
+sk = s.index.sk
+---
+...
+-- Should be 360, the same amount of statements as before restart.
+-- If we applied all deferred DELETEs, including the dumped ones,
+-- then there would be more.
+sk:stat().rows
+---
+- 360
+...
+s:drop()
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
diff --git a/test/vinyl/deferred_delete.test.lua b/test/vinyl/deferred_delete.test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..d18361a050c2a7d84512debb686ff5f0d9b53810
--- /dev/null
+++ b/test/vinyl/deferred_delete.test.lua
@@ -0,0 +1,261 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+
+--
+-- Create a space with secondary indexes and check that REPLACE and
+-- DELETE requests do not look up the old tuple in the primary index
+-- to generate the DELETE statements for secondary indexes. Instead
+-- DELETEs are generated when the primary index is compacted (gh-2129).
+-- The optimization should work for both non-unique and unique indexes
+-- so mark one of the indexes unique.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+pk = s:create_index('pk', {run_count_per_level = 10})
+i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
+i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true})
+for i = 1, 10 do s:replace{i, i, i} end
+box.snapshot()
+for i = 1, 10, 2 do s:delete{i} end
+for i = 2, 10, 2 do s:replace{i, i * 10, i * 100} end
+
+-- DELETE/REPLACE does not look up the old tuple in the primary index.
+pk:stat().lookup -- 0
+
+-- DELETEs are not written to secondary indexes.
+pk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 5 DELETEs
+i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs
+i2:stat().rows -- ditto
+
+-- Although there are only 5 tuples in the space, we have to look up
+-- overwritten tuples in the primary index hence 15 lookups per SELECT
+-- in a secondary index.
+i1:select()
+i1:stat().get.rows -- 15
+pk:stat().lookup -- 15
+i2:select()
+i2:stat().get.rows -- 15
+pk:stat().lookup -- 30
+
+-- Overwritten/deleted tuples are not stored in the cache so calling
+-- SELECT for a second time does only 5 lookups.
+box.stat.reset()
+i1:select()
+i1:stat().get.rows -- 5
+pk:stat().lookup -- 5
+i2:select()
+i2:stat().get.rows -- 5
+pk:stat().lookup -- 10
+
+-- Cleanup the cache.
+vinyl_cache = box.cfg.vinyl_cache
+box.cfg{vinyl_cache = 0}
+box.cfg{vinyl_cache = vinyl_cache}
+
+-- Compact the primary index to generate deferred DELETEs.
+box.snapshot()
+pk:compact()
+while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+pk:stat().rows -- 5 new REPLACEs
+i1:stat().rows -- 10 old REPLACE + 5 new REPLACEs + 10 deferred DELETEs
+i2:stat().rows -- ditto
+
+-- Deferred DELETEs must be ignored by the read iterator, because
+-- they may break the read iterator invariant, so they don't reduce
+-- the number of lookups.
+box.stat.reset()
+i1:select()
+i1:stat().get.rows -- 15
+pk:stat().lookup -- 15
+i2:select()
+i2:stat().get.rows -- 15
+pk:stat().lookup -- 30
+
+-- Check that deferred DELETEs are not lost after restart.
+test_run:cmd("restart server default")
+fiber = require('fiber')
+s = box.space.test
+pk = s.index.pk
+i1 = s.index.i1
+i2 = s.index.i2
+i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 10 deferred DELETEs
+i2:stat().rows -- ditto
+
+-- Dump deferred DELETEs to disk and compact them.
+-- Check that they cleanup garbage statements.
+box.snapshot()
+i1:compact()
+while i1:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+i2:compact()
+while i2:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+i1:stat().rows -- 5 new REPLACEs
+i2:stat().rows -- ditto
+box.stat.reset()
+i1:select()
+i1:stat().get.rows -- 5
+pk:stat().lookup -- 5
+i2:select()
+i2:stat().get.rows -- 5
+pk:stat().lookup -- 10
+
+s:drop()
+
+--
+-- Check that if the old tuple is found in cache or in memory, then
+-- the DELETE for secondary indexes is generated when the statement
+-- is committed.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+pk = s:create_index('pk', {run_count_per_level = 10})
+sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
+
+for i = 1, 10 do s:replace{i, i} end
+box.snapshot()
+s:count() -- add tuples to the cache
+
+box.stat.reset()
+for i = 1, 10, 2 do s:delete{i} end
+for i = 2, 10, 2 do s:replace{i, i * 10} end
+pk:stat().lookup -- 0
+pk:stat().cache.lookup -- 10
+pk:stat().cache.get.rows -- 10
+pk:stat().memory.iterator.lookup -- 0
+sk:stat().rows -- 10 old REPLACEs + 10 DELETEs + 5 new REPLACEs
+
+box.stat.reset()
+for i = 1, 10 do s:replace{i, i * 100} end
+pk:stat().lookup -- 0
+pk:stat().cache.lookup -- 10
+pk:stat().cache.get.rows -- 0
+pk:stat().memory.iterator.lookup -- 10
+pk:stat().memory.iterator.get.rows -- 10
+sk:stat().rows -- 15 old REPLACEs + 15 DELETEs + 10 new REPLACEs
+
+box.stat.reset()
+for i = 1, 10 do s:delete{i} end
+pk:stat().lookup -- 0
+pk:stat().cache.lookup -- 10
+pk:stat().cache.get.rows -- 0
+pk:stat().memory.iterator.lookup -- 10
+pk:stat().memory.iterator.get.rows -- 10
+sk:stat().rows -- 25 old REPLACEs + 25 DELETEs
+
+sk:select()
+pk:stat().lookup -- 0
+
+box.snapshot()
+sk:compact()
+while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+sk:stat().run_count -- 0
+
+s:drop()
+
+--
+-- Check that a transaction is aborted if it read a tuple from
+-- a secondary index that was overwritten in the primary index.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+pk = s:create_index('pk')
+sk = s:create_index('sk', {parts = {2, 'unsigned'}, unique = false})
+s:replace{1, 1}
+box.snapshot()
+
+box.begin()
+sk:select{1}
+c = fiber.channel(1)
+_ = fiber.create(function() s:replace{1, 10} c:put(true) end)
+c:get()
+sk:select{1}
+s:replace{10, 10}
+box.commit() -- error
+
+s:drop()
+
+--
+-- Check that if a tuple was overwritten in the transaction write set,
+-- it won't be committed to secondary indexes.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+pk = s:create_index('pk', {run_count_per_level = 10})
+sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
+for i = 1, 10 do s:replace{i, i} end
+box.snapshot()
+
+box.begin()
+for i = 1, 10 do s:replace{i, i * 10} end
+for i = 1, 10, 2 do s:delete{i} end
+for i = 2, 10, 2 do s:replace{i, i * 100} end
+box.commit()
+
+sk:select()
+
+pk:stat().rows -- 10 old REPLACEs + 5 DELETEs + 5 new REPLACEs
+sk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs
+
+-- Compact the primary index to generate deferred DELETEs.
+box.snapshot()
+pk:compact()
+while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+
+-- Compact the secondary index to cleanup garbage.
+box.snapshot()
+sk:compact()
+while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+
+sk:select()
+
+pk:stat().rows -- 5 new REPLACEs
+sk:stat().rows -- ditto
+
+s:drop()
+
+--
+-- Check that on recovery we do not apply deferred DELETEs that
+-- have been dumped to disk.
+--
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+test_run:cmd("start server test with args='1048576'")
+test_run:cmd("switch test")
+
+fiber = require('fiber')
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+pk = s:create_index('pk', {run_count_per_level = 10})
+sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned', 3, 'string'}, unique = false})
+
+pad = string.rep('x', 10 * 1024)
+for i = 1, 120 do s:replace{i, i, pad} end
+box.snapshot()
+
+pad = string.rep('y', 10 * 1024)
+for i = 1, 120 do s:replace{i, i, pad} end
+box.snapshot()
+
+sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs
+
+box.stat.reset()
+
+-- Compact the primary index to generate deferred DELETEs.
+-- Deferred DELETEs won't fit in memory and trigger dump
+-- of the secondary index.
+pk:compact()
+while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+
+sk:stat().disk.dump.count -- 1
+
+sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs + 120 deferred DELETEs
+
+test_run:cmd("restart server test with args='1048576'")
+s = box.space.test
+pk = s.index.pk
+sk = s.index.sk
+
+-- Should be 360, the same amount of statements as before restart.
+-- If we applied all deferred DELETEs, including the dumped ones,
+-- then there would be more.
+sk:stat().rows
+
+s:drop()
+
+test_run:cmd("switch default")
+test_run:cmd("stop server test")
+test_run:cmd("cleanup server test")
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 28271fc92b7b93aa0d920a008bba2e01a47133a1..cdffa198229aa6bbcfb09048bee647bdd220b8d4 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1844,3 +1844,81 @@ s.index.sk:stat().memory.rows
 s:drop()
 ---
 ...
+--
+-- Check that tarantool doesn't hang or crash if error
+-- occurs while writing a deferred DELETE to WAL.
+--
+fiber = require('fiber')
+---
+...
+errinj = box.error.injection
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+---
+...
+s:replace{1, 10}
+---
+- [1, 10]
+...
+box.snapshot()
+---
+- ok
+...
+s:replace{1, 20}
+---
+- [1, 20]
+...
+box.snapshot()
+---
+- ok
+...
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001)
+---
+- ok
+...
+errinj.set("ERRINJ_WAL_IO", true)
+---
+- ok
+...
+errors = box.stat.ERROR.total
+---
+...
+s.index.pk:compact()
+---
+...
+while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end
+---
+...
+s.index.pk:stat().disk.compact.count -- 0
+---
+- 0
+...
+errinj.set("ERRINJ_WAL_IO", false)
+---
+- ok
+...
+while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+s.index.pk:stat().disk.compact.count -- 1
+---
+- 1
+...
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
+---
+- ok
+...
+box.snapshot() -- ok
+---
+- ok
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 000067d35bef8ead93ba3782a7196fd4e3033723..c2332a692df3c680c54e45e96a386593928ed168 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -736,3 +736,32 @@ s.index.sk:select()
 s.index.sk:stat().memory.rows
 
 s:drop()
+
+--
+-- Check that tarantool doesn't hang or crash if error
+-- occurs while writing a deferred DELETE to WAL.
+--
+fiber = require('fiber')
+errinj = box.error.injection
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk', {run_count_per_level = 10})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+s:replace{1, 10}
+box.snapshot()
+s:replace{1, 20}
+box.snapshot()
+
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001)
+errinj.set("ERRINJ_WAL_IO", true)
+errors = box.stat.ERROR.total
+s.index.pk:compact()
+while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end
+s.index.pk:stat().disk.compact.count -- 0
+errinj.set("ERRINJ_WAL_IO", false)
+while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+s.index.pk:stat().disk.compact.count -- 1
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
+
+box.snapshot() -- ok
+s:drop()
diff --git a/test/vinyl/info.result b/test/vinyl/info.result
index 112ba85e48a5cfc8918edda09b32f998a9b82c45..95e8cc60ee1c9a0ef2a9e6be9c5c0947158eabf7 100644
--- a/test/vinyl/info.result
+++ b/test/vinyl/info.result
@@ -1036,10 +1036,10 @@ s:bsize()
 ---
 - 0
 ...
-i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 1})
+i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 10})
 ---
 ...
-i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 1})
+i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 10})
 ---
 ...
 s:bsize()
@@ -1162,7 +1162,7 @@ s:bsize()
 i1:len(), i2:len()
 ---
 - 150
-- 150
+- 100
 ...
 i1:bsize(), i2:bsize()
 ---
@@ -1189,13 +1189,25 @@ i2:bsize() == st2.memory.index_size + st2.disk.index_size + st2.disk.bloom_size
 ---
 - true
 ...
+-- Compact the primary index first to generate deferred DELETEs.
+-- Then dump them and compact the secondary index.
 box.snapshot()
 ---
 - ok
 ...
+i1:compact()
+---
+...
 wait(function() return i1:stat() end, st1, 'disk.compact.count', 1)
 ---
 ...
+box.snapshot()
+---
+- ok
+...
+i2:compact()
+---
+...
 wait(function() return i2:stat() end, st2, 'disk.compact.count', 1)
 ---
 ...
diff --git a/test/vinyl/info.test.lua b/test/vinyl/info.test.lua
index 863a87933a407dd21657669dbf39452df18a3194..5aebd0a847d5ed895ec64034cb80a7000209206a 100644
--- a/test/vinyl/info.test.lua
+++ b/test/vinyl/info.test.lua
@@ -322,8 +322,8 @@ s:drop()
 
 s = box.schema.space.create('test', {engine = 'vinyl'})
 s:bsize()
-i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 1})
-i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 1})
+i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 10})
+i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 10})
 s:bsize()
 i1:len(), i2:len()
 i1:bsize(), i2:bsize()
@@ -365,8 +365,13 @@ i2:len() == st2.memory.rows + st2.disk.rows
 i1:bsize() == st1.memory.index_size + st1.disk.index_size + st1.disk.bloom_size
 i2:bsize() == st2.memory.index_size + st2.disk.index_size + st2.disk.bloom_size + st2.disk.bytes
 
+-- Compact the primary index first to generate deferred DELETEs.
+-- Then dump them and compact the secondary index.
 box.snapshot()
+i1:compact()
 wait(function() return i1:stat() end, st1, 'disk.compact.count', 1)
+box.snapshot()
+i2:compact()
 wait(function() return i2:stat() end, st2, 'disk.compact.count', 1)
 st1 = i1:stat()
 st2 = i2:stat()
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 4982630235d9bb9938c684eea8d3ead36c6391fa..ee14cd512151ff0f9f5e21f914a8134447d96fb3 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -253,17 +253,17 @@ result
   - - 00000000000000000008.run
     - - HEADER:
           lsn: 10
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: ['Ñ‘Ñ‘Ñ‘', null]
       - HEADER:
           lsn: 9
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: ['эээ', null]
       - HEADER:
           lsn: 8
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: ['ЭЭЭ', null]
       - HEADER:
@@ -285,8 +285,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 90
-          unpacked_size: 71
+          size: 102
+          unpacked_size: 83
           row_count: 3
           min_key: ['Ñ‘Ñ‘Ñ‘']
   - - 00000000000000000012.run
@@ -295,20 +295,23 @@ result
           type: REPLACE
         BODY:
           tuple: ['Ñ‘Ñ‘Ñ‘', 123]
+          tuple_meta: {1: 1}
       - HEADER:
           lsn: 13
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: ['ÑŽÑŽÑŽ', 789]
+          tuple_meta: {1: 1}
       - HEADER:
           lsn: 12
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: ['ЮЮЮ', 456]
+          tuple_meta: {1: 1}
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
+          row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
   - - 00000000000000000006.index
     - - HEADER:
           type: RUNINFO
@@ -331,17 +334,17 @@ result
   - - 00000000000000000006.run
     - - HEADER:
           lsn: 10
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: [null, 'Ñ‘Ñ‘Ñ‘']
       - HEADER:
           lsn: 9
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: [null, 'эээ']
       - HEADER:
           lsn: 8
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: [null, 'ЭЭЭ']
       - HEADER:
@@ -357,41 +360,36 @@ result
           page_count: 1
           bloom_filter: <bloom_filter>
           max_lsn: 13
-          min_key: [null, 'Ñ‘Ñ‘Ñ‘']
+          min_key: [123, 'Ñ‘Ñ‘Ñ‘']
       - HEADER:
           type: PAGEINFO
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 110
-          unpacked_size: 91
-          row_count: 4
-          min_key: [null, 'Ñ‘Ñ‘Ñ‘']
+          size: 90
+          unpacked_size: 71
+          row_count: 3
+          min_key: [123, 'Ñ‘Ñ‘Ñ‘']
   - - 00000000000000000010.run
     - - HEADER:
-          lsn: 11
-          type: DELETE
-        BODY:
-          key: [null, 'Ñ‘Ñ‘Ñ‘']
-      - HEADER:
           lsn: 11
           type: REPLACE
         BODY:
           tuple: [123, 'Ñ‘Ñ‘Ñ‘']
       - HEADER:
           lsn: 12
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: [456, 'ЮЮЮ']
       - HEADER:
           lsn: 13
-          type: INSERT
+          type: REPLACE
         BODY:
           tuple: [789, 'ÑŽÑŽÑŽ']
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\02"
+          row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
 ...
 test_run:cmd("clear filter")
 ---
diff --git a/test/vinyl/quota.result b/test/vinyl/quota.result
index e323bc4e24330bd875aa6b8cb63105fb513fd0c2..480421852174062e617ee8e5a7656543453fda4f 100644
--- a/test/vinyl/quota.result
+++ b/test/vinyl/quota.result
@@ -89,7 +89,7 @@ _ = space:replace{1, 1, string.rep('a', 1024 * 1024 * 5)}
 ...
 box.stat.vinyl().quota.used
 ---
-- 5341228
+- 5341267
 ...
 space:drop()
 ---
diff --git a/test/vinyl/tx_gap_lock.result b/test/vinyl/tx_gap_lock.result
index 150826cbbe47474f0c1f54dc7942edd576ecd1d6..a456c017e7e1a364309898cf5008dbe9587724e2 100644
--- a/test/vinyl/tx_gap_lock.result
+++ b/test/vinyl/tx_gap_lock.result
@@ -1194,8 +1194,8 @@ s:drop()
 ---
 ...
 ----------------------------------------------------------------
--- gh-2534: Iterator over a secondary index doesn't double track
--- results in the primary index.
+-- Iterator over a secondary index tracks all results in the
+-- primary index. Needed for gh-2129.
 ----------------------------------------------------------------
 s = box.schema.space.create('test', {engine = 'vinyl'})
 ---
@@ -1219,23 +1219,23 @@ gap_lock_count() -- 0
 _ = s.index.sk:select({}, {limit = 50})
 ---
 ...
-gap_lock_count() -- 1
+gap_lock_count() -- 51
 ---
-- 1
+- 51
 ...
 for i = 1, 100 do s.index.sk:get(i) end
 ---
 ...
-gap_lock_count() -- 51
+gap_lock_count() -- 151
 ---
-- 51
+- 151
 ...
 _ = s.index.sk:select()
 ---
 ...
-gap_lock_count() -- 1
+gap_lock_count() -- 101
 ---
-- 1
+- 101
 ...
 box.commit()
 ---
diff --git a/test/vinyl/tx_gap_lock.test.lua b/test/vinyl/tx_gap_lock.test.lua
index 4d8d21d85911943d32986fe0e848a70d4c277cdd..4ad558608f06620155f0e841e739853f7945974f 100644
--- a/test/vinyl/tx_gap_lock.test.lua
+++ b/test/vinyl/tx_gap_lock.test.lua
@@ -380,8 +380,8 @@ c4:commit()
 
 s:drop()
 ----------------------------------------------------------------
--- gh-2534: Iterator over a secondary index doesn't double track
--- results in the primary index.
+-- Iterator over a secondary index tracks all results in the
+-- primary index. Needed for gh-2129.
 ----------------------------------------------------------------
 s = box.schema.space.create('test', {engine = 'vinyl'})
 _ = s:create_index('pk', {parts = {1, 'unsigned'}})
@@ -390,11 +390,11 @@ for i = 1, 100 do s:insert{i, i} end
 box.begin()
 gap_lock_count() -- 0
 _ = s.index.sk:select({}, {limit = 50})
-gap_lock_count() -- 1
-for i = 1, 100 do s.index.sk:get(i) end
 gap_lock_count() -- 51
+for i = 1, 100 do s.index.sk:get(i) end
+gap_lock_count() -- 151
 _ = s.index.sk:select()
-gap_lock_count() -- 1
+gap_lock_count() -- 101
 box.commit()
 gap_lock_count() -- 0
 s:drop()
diff --git a/test/vinyl/write_iterator.result b/test/vinyl/write_iterator.result
index c38de5d39730ad45a93f9e522bd5a80808e12fb7..cf1e426c3b521c855cb43083260f89f49efa883d 100644
--- a/test/vinyl/write_iterator.result
+++ b/test/vinyl/write_iterator.result
@@ -741,6 +741,11 @@ space:drop()
 s = box.schema.space.create('test', {engine = 'vinyl'})
 ---
 ...
+-- Install on_replace trigger to disable DELETE optimization
+-- in the secondary index (gh-2129).
+_ = s:on_replace(function() end)
+---
+...
 pk = s:create_index('primary', {run_count_per_level = 1})
 ---
 ...
diff --git a/test/vinyl/write_iterator.test.lua b/test/vinyl/write_iterator.test.lua
index 73c90c423b3776e87bf4409aab9cda84e63960b8..a1de240f1c58c05cf19b17a30467d948b7b70963 100644
--- a/test/vinyl/write_iterator.test.lua
+++ b/test/vinyl/write_iterator.test.lua
@@ -317,6 +317,9 @@ space:drop()
 -- gh-2875 INSERT+DELETE pairs are annihilated on compaction
 
 s = box.schema.space.create('test', {engine = 'vinyl'})
+-- Install on_replace trigger to disable DELETE optimization
+-- in the secondary index (gh-2129).
+_ = s:on_replace(function() end)
 pk = s:create_index('primary', {run_count_per_level = 1})
 sk = s:create_index('secondary', {run_count_per_level = 1, parts = {2, 'unsigned'}})
 PAD1 = 100