diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 17ae21102bfeed3ebd07f166c355831bd0e86936..71c7fbbcdf3e610420f383506c9bf5d11c79c903 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -1397,6 +1397,23 @@ memtx_space_finish_alter(struct space *old_space, struct space *new_space)
 		new_memtx_space->bsize = old_memtx_space->bsize;
+ * Invalidate the space in transaction manager.
+ */
+static void
+memtx_space_invalidate(struct space *space)
+	/*
+	 * Abort all concurrent transactions and invalidate space in MVCC
+	 * only if it is enabled. Otherwise, all concurrent transactions
+	 * will be aborted when DDL will be prepared.
+	 */
+	if (memtx_tx_manager_use_mvcc_engine) {
+		memtx_tx_abort_all_for_ddl(in_txn());
+		memtx_tx_invalidate_space(space, in_txn());
+	}
 /* }}} DDL */
 static const struct space_vtab memtx_space_vtab = {
@@ -1421,7 +1438,7 @@ static const struct space_vtab memtx_space_vtab = {
 	/* .prepare_alter = */ memtx_space_prepare_alter,
 	/* .finish_alter = */ memtx_space_finish_alter,
 	/* .prepare_upgrade = */ memtx_space_prepare_upgrade,
-	/* .invalidate = */ generic_space_invalidate,
+	/* .invalidate = */ memtx_space_invalidate,
 struct space *
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 1e776c584d4e7ca6bb1cb98aaa3bbbd232a68b98..4c0980598ce11df34b9030e95c1704057d5168e1 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -682,6 +682,12 @@ memtx_tx_acquire_ddl(struct txn *tx)
 	(void) txn_can_yield(tx, false);
+ * Clean and clear all read lists of @a txn.
+ */
+static void
+memtx_tx_clear_txn_read_lists(struct txn *txn);
 memtx_tx_abort_all_for_ddl(struct txn *ddl_owner)
@@ -696,7 +702,8 @@ memtx_tx_abort_all_for_ddl(struct txn *ddl_owner)
 		to_be_aborted->status = TXN_ABORTED;
 		txn_set_flags(to_be_aborted, TXN_IS_CONFLICTED);
-		say_warn("Transaction committing DDL (id=%lld) has aborted "
+		memtx_tx_clear_txn_read_lists(to_be_aborted);
+		say_warn("Transaction processing DDL (id=%lld) has aborted "
 			 "another TX (id=%lld)", (long long) ddl_owner->id,
 			 (long long) to_be_aborted->id);
@@ -1199,51 +1206,8 @@ static void
 memtx_tx_story_unlink_top_on_space_delete(struct memtx_story *story,
 					  uint32_t idx)
-	assert(story != NULL);
-	assert(idx < story->index_count);
 	struct memtx_story_link *link = &story->link[idx];
-	assert(link->newer_story == NULL);
-	/*
-	 * Note that link[idx].in_index may not be the same as
-	 * story->space->index[idx] in case space is going to be deleted
-	 * in memtx_tx_on_space_delete(): during space alter operation we
-	 * swap all indexes to the new space object and instead use dummy
-	 * structs.
-	 */
-	struct index *index = story->link[idx].in_index;
 	struct memtx_story *old_story = link->older_story;
-	assert(old_story == NULL || old_story->link[idx].in_index == NULL);
-	struct tuple *old_tuple = old_story == NULL ? NULL : old_story->tuple;
-	struct tuple *removed, *unused;
-	if (index_replace(index, story->tuple, old_tuple,
-			  DUP_INSERT, &removed, &unused) != 0) {
-		diag_log();
-		unreachable();
-		panic("failed to rebind story in index");
-	}
-	assert(story->tuple == removed ||
-	       (removed == NULL && tuple_key_is_excluded(story->tuple,
-							 index->def->key_def,
-							 MULTIKEY_NONE)));
-	story->link[idx].in_index = NULL;
-	if (old_story != NULL)
-		old_story->link[idx].in_index = index;
-	/*
-	 * A space holds references to all his tuples.
-	 * All tuples that are physically in the primary index are referenced.
-	 * Thus we have to reference the tuple that was added to the primary
-	 * index and dereference the tuple that was removed from it.
-	 */
-	if (idx == 0) {
-		if (old_story != NULL)
-			memtx_tx_ref_to_primary(old_story);
-		memtx_tx_unref_from_primary(story);
-	}
-	/* Now simply unlink the story. */
 	if (old_story != NULL)
 		memtx_tx_story_unlink(story, old_story, idx);
@@ -1279,7 +1243,6 @@ memtx_tx_story_unlink_both_on_space_delete(struct memtx_story *story,
 	assert(idx < story->index_count);
 	struct memtx_story_link *link = &story->link[idx];
 	if (link->newer_story == NULL) {
-		assert(link->in_index != NULL);
 		memtx_tx_story_unlink_top_on_space_delete(story, idx);
 	} else {
 		memtx_tx_story_unlink_both_common(story, idx);
@@ -1329,11 +1292,7 @@ memtx_tx_story_reorder(struct memtx_story *story,
- * Unlink @a story from all chains and remove corresponding tuple from
- * indexes if necessary: used in `memtx_tx_on_space_delete` — intentionally
- * violates the top of the history chain invariant (see
- * `memtx_tx_story_full_unlink_story_gc_step`), since all stories are deleted
- * anyways.
+ * Unlink @a story from all chains, must be already removed from the index.
 static void
 memtx_tx_story_full_unlink_on_space_delete(struct memtx_story *story)
@@ -1341,38 +1300,7 @@ memtx_tx_story_full_unlink_on_space_delete(struct memtx_story *story)
 	for (uint32_t i = 0; i < story->index_count; i++) {
 		struct memtx_story_link *link = &story->link[i];
 		if (link->newer_story == NULL) {
-			/*
-			 * We are at the top of the chain. That means
-			 * that story->tuple might be in index. If the story
-			 * actually deletes the tuple and is present in index,
-			 * it must be deleted from index.
-			 */
-			if (story->del_psn > 0 && link->in_index != NULL) {
-				struct index *index = link->in_index;
-				struct tuple *removed, *unused;
-				if (index_replace(index, story->tuple, NULL,
-						  DUP_INSERT,
-						  &removed, &unused) != 0) {
-					diag_log();
-					unreachable();
-					panic("failed to rollback change");
-				}
-				struct key_def *key_def = index->def->key_def;
-				assert(story->tuple == removed ||
-				       (removed == NULL &&
-					tuple_key_is_excluded(story->tuple,
-							      key_def,
-							      MULTIKEY_NONE)));
-				(void)key_def;
-				link->in_index = NULL;
-				/*
-				 * All tuples in pk are referenced.
-				 * Once removed it must be unreferenced.
-				 */
-				if (i == 0)
-					memtx_tx_unref_from_primary(story);
-			}
+			assert(link->in_index == NULL);
 			memtx_tx_story_unlink(story, link->older_story, i);
 		} else {
 			/* Just unlink from list */
@@ -2338,6 +2266,43 @@ memtx_tx_history_rollback_deleted_story(struct txn_stmt *stmt)
 	memtx_tx_story_unlink_deleted_by(del_story, stmt);
+ * The helper rolls back a statements that is empty - has no stories
+ * linked. It can happen due to several reasons:
+ * 1. MVCC hasn't created stories for the stmt. It happens when space is
+ *    ephemeral or when the statement has deleted nothing. In this case
+ *    helper does nothing.
+ * 2. MVCC created stories for the statement, but they were deleted due to
+ *    DDL - here are 3 types of such transactions. First one is concurrent
+ *    with DDL. We shouldn't roll them back because we have already handled
+ *    them on DDL. Second one is DDL itself (`is_schema_changed` flag is set)
+ *    since stories of all the DML operations that happened before DDL were
+ *    deleted. We must roll its statements back because now the space contain
+ *    all its tuples. Third type is transactions prepared before DDL. We've
+ *    also removed their stories on DDL, so here we should roll them back
+ *    without stories if they have failed to commit.
+ */
+memtx_tx_history_rollback_empty_stmt(struct txn_stmt *stmt)
+	struct tuple *old_tuple = stmt->rollback_info.old_tuple;
+	struct tuple *new_tuple = stmt->rollback_info.new_tuple;
+	if (!stmt->txn->is_schema_changed && stmt->txn->psn == 0)
+		return;
+	if (stmt->space->def->opts.is_ephemeral ||
+	    (old_tuple == NULL && new_tuple == NULL))
+		return;
+	for (size_t i = 0; i < stmt->space->index_count; i++) {
+		struct tuple *unused;
+		if (index_replace(stmt->space->index[i], new_tuple, old_tuple,
+				  DUP_REPLACE_OR_INSERT, &unused,
+				  &unused) != 0) {
+			panic("failed to rebind story in index on "
+			      "rollback of statement without story");
+		}
+	}
 memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
@@ -2362,6 +2327,8 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
 	else if (stmt->del_story != NULL)
+	else
+		memtx_tx_history_rollback_empty_stmt(stmt);
 	assert(stmt->add_story == NULL && stmt->del_story == NULL);
@@ -2399,7 +2366,8 @@ memtx_tx_history_remove_stmt(struct txn_stmt *stmt)
 	if (stmt->del_story != NULL)
-	stmt->engine_savepoint = NULL;
+	if (stmt->txn->status == TXN_ABORTED)
+		stmt->engine_savepoint = NULL;
@@ -2689,12 +2657,6 @@ memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
- * Clean and clear all read lists of @a txn.
- */
-static void
-memtx_tx_clear_txn_read_lists(struct txn *txn);
 memtx_tx_prepare_finalize(struct txn *txn)
@@ -2919,20 +2881,68 @@ memtx_tx_on_index_delete(struct index *index)
-memtx_tx_on_space_delete(struct space *space)
+memtx_tx_invalidate_space(struct space *space, struct txn *active_txn)
+	struct memtx_story *story;
+	/*
+	 * Phase one: fill the indexes with actual tuples. Here we insert
+	 * all tuples visible to `active_txn`.
+	 */
+	rlist_foreach_entry(story, &space->memtx_stories, in_space_stories) {
+		assert(story->index_count == space->index_count);
+		for (uint32_t i = 0; i < story->index_count; i++) {
+			struct index *index = story->link[i].in_index;
+			if (index == NULL)
+				continue;
+			/* Mark as not in index. */
+			story->link[i].in_index = NULL;
+			/* Skip chains of excluded tuples. */
+			if (tuple_key_is_excluded(story->tuple,
+						  index->def->key_def,
+						  MULTIKEY_NONE))
+				continue;
+			struct tuple *new_tuple = NULL;
+			bool is_own_change;
+			memtx_tx_story_find_visible_tuple(story, active_txn, i,
+							  true, &new_tuple,
+							  &is_own_change);
+			/* Visible tuple is already in index - do nothing. */
+			if (new_tuple == story->tuple)
+				continue;
+			struct tuple *unused;
+			if (index_replace(index, story->tuple, new_tuple,
+					  DUP_REPLACE, &unused,
+					  &unused) != 0) {
+				diag_log();
+				unreachable();
+				panic("failed to rebind story in index on "
+				      "space invalidation");
+			}
+			if (i == 0) {
+				if (new_tuple != NULL) {
+					memtx_tx_ref_to_primary(
+						memtx_tx_story_get(new_tuple));
+				}
+				memtx_tx_unref_from_primary(story);
+			}
+		}
+	}
+	/*
+	 * Phase two: destroy all the stories. They are expected to be unlinked
+	 * from the indexes during the first phase.
+	 */
 	while (!rlist_empty(&space->memtx_stories)) {
-		struct memtx_story *story
-			= rlist_first_entry(&space->memtx_stories,
-					    struct memtx_story,
-					    in_space_stories);
-		/*
-		 * Space is to be altered (not necessarily dropped). Since
-		 * this operation is considered to be DDL, all other
-		 * transactions will be aborted anyway. We can't postpone
-		 * rollback till actual call of commit/rollback since stories
-		 * should be destroyed immediately.
-		 */
+		story = rlist_first_entry(&space->memtx_stories,
+					  struct memtx_story,
+					  in_space_stories);
 		if (story->add_stmt != NULL)
 		while (story->del_stmt != NULL)
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 5ab55817d5f3d633bb9856b431ce7d3297c65167..992a12609197a38a25b9bc9e1f42c08570ee61d1 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -413,15 +413,15 @@ void
 memtx_tx_on_index_delete(struct index *index);
- * Notify manager the a space is deleted.
- * It's necessary because there is a chance that garbage collector hasn't
- * deleted all stories of that space and in that case some actions of
- * story's destructor are not applicable.
+ * Invalidate space in memtx tx: remove all the objects associated with
+ * space and its schema. The helper is supposed to be called when there is
+ * only one active transaction that is passed as `active_txn`. The indexes
+ * are populated with tuples according to what `active_txn` observes.
  * NB: can trigger story garbage collection.
-memtx_tx_on_space_delete(struct space *space);
+memtx_tx_invalidate_space(struct space *space, struct txn *active_txn);
  * Create a snapshot cleaner.
diff --git a/src/box/space.c b/src/box/space.c
index 48a2d6240ac234fa906041d34f09c4856ff42a2d..5477a58d907fbf7d4fad3d788b4201a621ac20e1 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -442,7 +442,7 @@ void
 space_delete(struct space *space)
-	memtx_tx_on_space_delete(space);
+	assert(rlist_empty(&space->memtx_stories));
 	for (uint32_t j = 0; j <= space->index_id_max; j++) {
 		struct index *index = space->index_map[j];
 		if (index != NULL)
diff --git a/test/box-luatest/mvcc_ddl_test.lua b/test/box-luatest/mvcc_ddl_test.lua
index 0b6b1080fc8e517ae84191c30e923812f3480566..0c5c8f7afd357324c0aa29b11cde0cdc9e5be9ec 100644
--- a/test/box-luatest/mvcc_ddl_test.lua
+++ b/test/box-luatest/mvcc_ddl_test.lua
@@ -137,3 +137,282 @@ g.test_reader_list_use_after_free = function(cg)
+-- The test checks if stories associated with old schema cannot
+-- be access on index creation
+-- gh-10096
+g.test_dont_access_old_stories_on_index_create = function(cg)
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        s:create_index('sk1', {parts = {{2}}, unique = true})
+        s:insert{1, 1, 1, 0}
+        -- Collect stories to be independent from GC steps
+        box.internal.memtx_tx_gc(100)
+        -- Create index in a separate transaction
+        -- Transaction creating index will create story for our tuple
+        -- when it reads it to insert into the new index
+        -- The problem is the story is created with old index_count and
+        -- does not have links for created index
+        local created_index = false
+        local f = fiber.create(function()
+            s:create_index('sk2', {parts = {{3}}, unique = true})
+            created_index = true
+        end)
+        f:set_joinable(true)
+        -- Make sure txn is still in progress
+        assert(not created_index)
+        s:replace{1, 1, 1, 1}
+        local ok = f:join()
+        t.assert(ok)
+    end)
+-- The test checks if stories associated with old schema cannot
+-- be access when index is being altered
+-- gh-10097
+g.test_dont_access_old_stories_on_index_alter = function(cg)
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        s:create_index('sk1', {parts = {{2}}, unique = true})
+        s:insert{1, 1, 1, 0}
+        -- Collect stories to be independent from GC steps
+        box.internal.memtx_tx_gc(100)
+        -- Alter index in a separate transaction
+        -- Transaction creating index will create story for our tuple
+        -- when it reads it to insert into the new index
+        -- The problem is the story link points to the old index, but
+        -- it was replaced with the new one (new index object, hence,
+        -- new address)
+        local altered_index = false
+        local f = fiber.create(function()
+            s.index.sk1:alter({parts = {{3}}, unique = true})
+            altered_index = true
+        end)
+        f:set_joinable(true)
+        -- Make sure txn is still in progress
+        assert(not altered_index)
+        s:replace{1, 1, 1, 1}
+        local ok = f:join()
+        t.assert(ok)
+    end)
+-- The test covers a crash when using count after DDL because
+-- DDL has violated some MVCC invariants that are checked in count
+-- gh-10474, case 3
+g.test_count_crashes_after_ddl = function(cg)
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.create_space('test')
+        s:create_index('pk')
+        s:create_index('sk', {parts={{2}}})
+        -- Do a sequence of operations under WAL delay:
+        -- Replace
+        -- DDL
+        -- Replace
+        box.error.injection.set('ERRINJ_WAL_DELAY', true)
+        fiber.create(function() box.space.test:replace{1, 1, 1} end)
+        local f = fiber.create(function()
+            box.space.test:format{{name = 'field1', type = 'scalar'}}
+        end)
+        fiber.create(function() box.space.test:replace{1, 1, 1} end)
+        f:set_joinable(true)
+        box.error.injection.set('ERRINJ_WAL_DELAY', false)
+        local ok = f:join()
+        t.assert(ok)
+        t.assert_equals(box.space.test:count(), 1)
+    end)
+-- The test checks if isolation of transactions is not broken when DDL
+-- is committed. It happened because we removed all memtx stories on commit
+-- eariler.
+g.test_txn_isolation_on_ddl_commit = function(cg)
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        -- Few tuples to do DDL without yields
+        for i = 1, 5 do
+            s:replace{i}
+        end
+        -- Collect stories to be independent from GC steps
+        box.internal.memtx_tx_gc(100)
+        -- Alter space in a separate transaction
+        local altered_space = false
+        local f = fiber.create(function()
+            s:format({{name = 'field1', type = 'unsigned'}})
+            altered_space = true
+        end)
+        f:set_joinable(true)
+        -- Make sure txn is still in progress
+        t.assert(not altered_space)
+        box.begin()
+        -- Drop all the tuples in the new transaction
+        for i = 1, 5 do
+            s:delete(i)
+        end
+        -- Check if tuples were deleted
+        t.assert_equals(s:select{}, {})
+        -- Wait for DDL to be committed and make sure it was successful
+        local ok = f:join()
+        t.assert(ok)
+        -- Check contents after yield
+        t.assert_equals(s:select{}, {})
+        box.commit()
+        -- We committed a transaction deleting all tuples just now, result
+        -- still must be empty
+        t.assert_equals(s:select{}, {})
+    end)
+-- The test checks if DDL aborts all conflicting transactions - not preparing
+-- the DDL but the operation itself. We should check this behavior because we
+-- delete all memtx stories on DDL so we cannot support transaction isolation
+-- anymore. If we didn't abort active transactions, rollback of DDL could
+-- violate their isolation.
+g.test_abort_all_on_ddl = function(cg)
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        local errmsg = 'Transaction has been aborted by conflict'
+        box.begin()
+        s:replace{1}
+        local ddl = fiber.create(function()
+            s:create_index('sk')
+        end)
+        ddl:set_joinable(true)
+        t.assert_error_msg_content_equals(errmsg, box.commit)
+        local ok = ddl:join()
+        t.assert(ok)
+    end)
+-- The test checks if rollback of DDL aborts all conflicting transactions since
+-- they rely on new schema which is going to be rolled back. Since we already
+-- abort all transactions on DDL itself and transaction doing DDL cannot yield,
+-- the only way a concurrent transaction can appear is after DDL is prepared but
+-- before it is committed. If DDL wasn't prepared (manual rollback), there
+-- can't be concurrent transactions.
+g.test_abort_all_ddl_rollback = function(cg)
+    t.tarantool.skip_if_not_debug()
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        local errmsg = 'Transaction has been aborted by conflict'
+        box.error.injection.set('ERRINJ_WAL_DELAY', true)
+        local ddl = fiber.create(function()
+            s:create_index('sk')
+        end)
+        ddl:set_joinable(true)
+        box.begin()
+        s:replace{1}
+        box.error.injection.set('ERRINJ_WAL_WRITE', true)
+        box.error.injection.set('ERRINJ_WAL_DELAY', false)
+        local ok = ddl:join()
+        t.assert_not(ok, 'DDL must fail and rollback due to WAL error')
+        box.error.injection.set('ERRINJ_WAL_WRITE', false)
+        t.assert_error_msg_content_equals(errmsg, box.commit)
+    end)
+g.test_rollback_mixed_ddl_and_dml = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        for i = 1, 10 do
+            s:replace{i}
+        end
+        local saved_select = s:select{}
+        box.begin()
+        s:replace{1, 1}
+        s:delete{1}
+        s:delete{2}
+        s:insert{11}
+        s:alter({is_sync = true})
+        s:replace{3, 1}
+        s:delete{4}
+        s:delete{5}
+        s:insert{12}
+        s:alter({is_sync = false})
+        box.rollback()
+        t.assert_equals(s:select{}, saved_select)
+    end)
+g.test_rollback_prepared_dml_and_ddl = function(cg)
+    t.tarantool.skip_if_not_debug()
+    cg.server:exec(function()
+        local fiber = require('fiber')
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1}}})
+        for i = 1, 10 do
+            s:replace{i}
+        end
+        local saved_select = s:select{}
+        box.error.injection.set('ERRINJ_WAL_DELAY', true)
+        local dml1 = fiber.create(function()
+            box.begin()
+            s:replace{1, 1}
+            s:replace{2, 2}
+            s:delete{3}
+            s:delete{4}
+            s:insert{100}
+            box.commit()
+        end)
+        dml1:set_joinable(true)
+        local ddl = fiber.create(function()
+            s:create_index('sk')
+        end)
+        ddl:set_joinable(true)
+        local dml2 = fiber.create(function()
+            box.begin()
+            s:replace{4, 4}
+            s:replace{5, 5}
+            s:delete{8}
+            s:insert{200}
+            box.commit()
+        end)
+        dml2:set_joinable(true)
+        box.error.injection.set('ERRINJ_WAL_WRITE', true)
+        box.error.injection.set('ERRINJ_WAL_DELAY', false)
+        local ok = dml1:join()
+        t.assert_not(ok)
+        ok = dml2:join()
+        t.assert_not(ok)
+        ok = ddl:join()
+        t.assert_not(ok)
+        t.assert_equals(s:select{}, saved_select)
+    end)