From 8631ffb24c67b2597763934ed1fc8eaae1e16da1 Mon Sep 17 00:00:00 2001
From: GeorgyKirichenko <kirichenkoga@gmail.com>
Date: Fri, 7 Jul 2017 15:42:42 +0300
Subject: [PATCH] Altering a space takes effect immediately.

Before this patch, the new space, created by alter specification,
would be put into space cache only after successful WAL write.

This behaviour is not linearizable: on a replica, the WAL is
played sequentially, and the order of events could differ from the
master.

Besides, it could crash, as demonstrated in gh-2074 test case.

Since we use a cascading rollback for all transactions on WAL
write error, it's OK to put a space into space cache
before WAL write, so that the new transactions apply to the new
space.

This patch does exactly that.

All subsequent requests are executed against the new space.

This patch also removes on_replace trigger in the old space, since
all checks against the new tuple format are performed using the new
space.

Fixes #2074.
---
 src/box/alter.cc      | 213 +++++++++++++++++++-----------------------
 test/box/ddl.result   |  56 ++++++++++-
 test/box/ddl.test.lua |  33 ++++++-
 3 files changed, 178 insertions(+), 124 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 6a8749b505..587f77823e 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -463,6 +463,7 @@ class AlterSpaceOp {
 	virtual void alter_def(struct alter_space * /* alter */) {}
 	virtual void alter(struct alter_space * /* alter */) {}
 	virtual void commit(struct alter_space * /* alter */) {}
+	virtual void rollback(struct alter_space * /* alter */) {}
 	virtual ~AlterSpaceOp() {}
 	template <typename T> static T *create();
 	static void destroy(AlterSpaceOp *op);
@@ -562,32 +563,13 @@ alter_space_commit(struct trigger *trigger, void * /* event */)
 	rlist_foreach_entry(op, &alter->ops, link) {
 		op->commit(alter);
 	}
-	/* Rebuild index maps once for all indexes. */
-	space_fill_index_map(alter->old_space);
-	space_fill_index_map(alter->new_space);
-	/*
-	 * Don't forget about space triggers.
-	 */
-	rlist_swap(&alter->new_space->on_replace,
-		   &alter->old_space->on_replace);
-	rlist_swap(&alter->new_space->on_stmt_begin,
-		   &alter->old_space->on_stmt_begin);
-	/*
-	 * Init space bsize.
-	 */
-	if (alter->new_space->index_count != 0)
-		alter->new_space->bsize = alter->old_space->bsize;
 	/*
 	 * The new space is ready. Time to update the space
 	 * cache with it.
 	 */
-	alter->old_space->handler->commitAlterSpace(alter->old_space,
-						    alter->new_space);
 
-	struct space *old_space = space_cache_replace(alter->new_space);
 	alter->new_space = NULL; /* for alter_space_delete(). */
-	assert(old_space == alter->old_space);
-	space_delete(old_space);
+	space_delete(alter->old_space);
 	alter_space_delete(alter);
 }
 
@@ -603,6 +585,23 @@ static void
 alter_space_rollback(struct trigger *trigger, void * /* event */)
 {
 	struct alter_space *alter = (struct alter_space *) trigger->data;
+	/* Rollback alter ops */
+	class AlterSpaceOp *op;
+	rlist_foreach_entry(op, &alter->ops, link) {
+		op->rollback(alter);
+	}
+	/* Rebuild index maps once for all indexes. */
+	space_fill_index_map(alter->old_space);
+	space_fill_index_map(alter->new_space);
+	/*
+	 * Don't forget about space triggers.
+	 */
+	rlist_swap(&alter->new_space->on_replace,
+		   &alter->old_space->on_replace);
+	rlist_swap(&alter->new_space->on_stmt_begin,
+		   &alter->old_space->on_stmt_begin);
+	struct space *new_space = space_cache_replace(alter->old_space);
+	assert(new_space == alter->new_space);
 	alter_space_delete(alter);
 }
 
@@ -669,8 +668,51 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
 	 * Change the new space: build the new index, rename,
 	 * change the fixed field count.
 	 */
-	rlist_foreach_entry(op, &alter->ops, link)
-		op->alter(alter);
+	try {
+		rlist_foreach_entry(op, &alter->ops, link)
+			op->alter(alter);
+	} catch (Exception *e) {
+		/*
+		 * Undo space changes from the last successful
+		 * operation back to the first. Skip the operation
+		 * which failed. An operation may fail during
+		 * alter if, e.g. if it adds a unique key and
+		 * there is a duplicate.
+		 */
+		while (op != rlist_first_entry(&alter->ops,
+					       class AlterSpaceOp, link)) {
+			op = rlist_prev_entry(prev, link);
+			op->rollback(alter);
+		}
+		throw;
+	}
+
+	/* Rebuild index maps once for all indexes. */
+	space_fill_index_map(alter->old_space);
+	space_fill_index_map(alter->new_space);
+	/*
+	 * Don't forget about space triggers.
+	 */
+	rlist_swap(&alter->new_space->on_replace,
+		   &alter->old_space->on_replace);
+	rlist_swap(&alter->new_space->on_stmt_begin,
+		   &alter->old_space->on_stmt_begin);
+	/*
+	 * Init space bsize.
+	 */
+	if (alter->new_space->index_count != 0)
+		alter->new_space->bsize = alter->old_space->bsize;
+	/*
+	 * The new space is ready. Time to update the space
+	 * cache with it.
+	 */
+	alter->old_space->handler->commitAlterSpace(alter->old_space,
+						    alter->new_space);
+
+	struct space *old_space = space_cache_replace(alter->new_space);
+	(void) old_space;
+	assert(old_space == alter->old_space);
+
 	/*
 	 * Install transaction commit/rollback triggers to either
 	 * finish or rollback the DDL depending on the results of
@@ -765,11 +807,17 @@ class MoveIndex: public AlterSpaceOp
 public:
 	/** id of the index on the move. */
 	uint32_t iid;
-	virtual void commit(struct alter_space *alter);
+	virtual void alter(struct alter_space *alter);
+	virtual void rollback(struct alter_space *alter);
 };
+void
+MoveIndex::alter(struct alter_space *alter)
+{
+	space_swap_index(alter->old_space, alter->new_space, iid, iid);
+}
 
 void
-MoveIndex::commit(struct alter_space *alter)
+MoveIndex::rollback(struct alter_space *alter)
 {
 	space_swap_index(alter->old_space, alter->new_space, iid, iid);
 }
@@ -784,7 +832,8 @@ class ModifyIndex: public AlterSpaceOp
 	struct index_def *new_index_def;
 	struct index_def *old_index_def;
 	virtual void alter_def(struct alter_space *alter);
-	virtual void commit(struct alter_space *alter);
+	virtual void alter(struct alter_space *alter);
+	virtual void rollback(struct alter_space *alter);
 	virtual ~ModifyIndex();
 };
 
@@ -796,9 +845,8 @@ ModifyIndex::alter_def(struct alter_space *alter)
 	rlist_add_entry(&alter->key_list, new_index_def, link);
 }
 
-/** Move the index from the old space to the new one. */
 void
-ModifyIndex::commit(struct alter_space *alter)
+ModifyIndex::alter(struct alter_space *alter)
 {
 	assert(old_index_def->iid == new_index_def->iid);
 	/*
@@ -814,6 +862,24 @@ ModifyIndex::commit(struct alter_space *alter)
 	index_def_swap(old_index->index_def, new_index->index_def);
 }
 
+void
+ModifyIndex::rollback(struct alter_space *alter)
+{
+	assert(old_index_def->iid == new_index_def->iid);
+	/*
+	 * Restore indexes.
+	 */
+	space_swap_index(alter->old_space, alter->new_space,
+			 old_index_def->iid, new_index_def->iid);
+	Index *old_index = space_index(alter->old_space, old_index_def->iid);
+	assert(old_index != NULL);
+	Index *new_index = space_index(alter->new_space, new_index_def->iid);
+	assert(new_index != NULL);
+	struct index_def *tmp = old_index->index_def;
+	old_index->index_def = new_index->index_def;
+	new_index->index_def = tmp;
+}
+
 ModifyIndex::~ModifyIndex()
 {
 	index_def_delete(new_index_def);
@@ -824,7 +890,6 @@ class CreateIndex: public AlterSpaceOp {
 public:
 	/** New index index_def. */
 	struct index_def *new_index_def;
-	struct trigger *on_replace;
 	virtual void alter_def(struct alter_space *alter);
 	virtual void alter(struct alter_space *alter);
 	virtual void commit(struct alter_space *alter);
@@ -838,62 +903,6 @@ CreateIndex::alter_def(struct alter_space *alter)
 	rlist_add_tail_entry(&alter->key_list, new_index_def, link);
 }
 
-/**
- * A trigger invoked on rollback in old space while the record
- * about alter is being written to the WAL.
- */
-static void
-on_rollback_in_old_space(struct trigger *trigger, void *event)
-{
-	struct txn *txn = (struct txn *) event;
-	Index *new_index = (Index *) trigger->data;
-	/* Remove the failed tuple from the new index. */
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->space->def.id != new_index->index_def->space_id)
-			continue;
-		new_index->replace(stmt->new_tuple, stmt->old_tuple,
-				   DUP_INSERT);
-	}
-}
-
-/**
- * Add to index trigger -- invoked on any change in the old space,
- * while the CreateIndex tuple is being written to the WAL. The job
- * of this trigger is to keep the added index up to date with the
- * state of the primary key in the old space.
- *
- * Initially it's installed as old_space->on_replace trigger, and
- * for each successfully replaced tuple in the new index,
- * a trigger is added to txn->on_rollback list to remove the tuple
- * from the new index if the transaction rolls back.
- *
- * The trigger is removed when alter operation commits/rolls back.
- */
-static void
-on_replace_in_old_space(struct trigger *trigger, void *event)
-{
-	struct txn *txn = (struct txn *) event;
-	struct txn_stmt *stmt = txn_current_stmt(txn);
-	Index *new_index = (Index *) trigger->data;
-	/*
-	 * First set a rollback trigger, then do replace, since
-	 * creating the trigger may fail.
-	 */
-	struct trigger *on_rollback =
-		txn_alter_trigger_new(on_rollback_in_old_space, new_index);
-	/*
-	 * In a multi-statement transaction the same space
-	 * may be modified many times, but we need only one
-	 * on_rollback trigger.
-	 */
-	txn_init_triggers(txn);
-	trigger_add_unique(&txn->on_rollback, on_rollback);
-	/* Put the tuple into the new index. */
-	(void) new_index->replace(stmt->old_tuple, stmt->new_tuple,
-				  DUP_INSERT);
-}
-
 /**
  * Optionally build the new index.
  *
@@ -927,10 +936,7 @@ CreateIndex::alter(struct alter_space *alter)
 	 * Get the new index and build it.
 	 */
 	Index *new_index = index_find_xc(alter->new_space, new_index_def->iid);
-	handler->buildSecondaryKey(alter->old_space, alter->new_space, new_index);
-	on_replace = txn_alter_trigger_new(on_replace_in_old_space,
-					   new_index);
-	trigger_add(&alter->old_space->on_replace, on_replace);
+	handler->buildSecondaryKey(alter->new_space, alter->new_space, new_index);
 }
 
 void
@@ -942,13 +948,6 @@ CreateIndex::commit(struct alter_space *alter)
 
 CreateIndex::~CreateIndex()
 {
-	/*
-	 * The trigger by now may reside in the new space (on
-	 * commit) or in the old space (rollback). Remove it
-	 * from the list, wherever it is.
-	 */
-	if (on_replace)
-		trigger_clear(on_replace);
 	if (new_index_def)
 		index_def_delete(new_index_def);
 }
@@ -964,10 +963,8 @@ class RebuildIndex: public AlterSpaceOp {
 	struct index_def *new_index_def;
 	/** Old index index_def. */
 	struct index_def *old_index_def;
-	struct trigger *on_replace;
 	virtual void alter_def(struct alter_space *alter);
 	virtual void alter(struct alter_space *alter);
-	virtual void commit(struct alter_space *alter);
 	virtual ~RebuildIndex();
 };
 
@@ -987,29 +984,13 @@ RebuildIndex::alter(struct alter_space *alter)
 	/* Get the new index and build it.  */
 	Index *new_index = space_index(alter->new_space, new_index_def->iid);
 	assert(new_index != NULL);
-	handler->buildSecondaryKey(alter->old_space, alter->new_space, new_index);
-	on_replace = txn_alter_trigger_new(on_replace_in_old_space,
-					   new_index);
-	trigger_add(&alter->old_space->on_replace, on_replace);
-}
-
-void
-RebuildIndex::commit(struct alter_space *alter)
-{
-	Index *new_index = space_index(alter->new_space, new_index_def->iid);
-	assert(new_index != NULL);
-	new_index->commitCreate();
+	handler->buildSecondaryKey(new_index_def->iid != 0 ?
+				   alter->new_space : alter->old_space,
+				   alter->new_space, new_index);
 }
 
 RebuildIndex::~RebuildIndex()
 {
-	/*
-	 * The trigger by now may reside in the new space (on
-	 * commit) or in the old space (rollback). Remove it
-	 * from the list, wherever it is.
-	 */
-	if (on_replace)
-		trigger_clear(on_replace);
 	if (new_index_def)
 		index_def_delete(new_index_def);
 }
diff --git a/test/box/ddl.result b/test/box/ddl.result
index 5d5cd8b18a..8ba8147ee8 100644
--- a/test/box/ddl.result
+++ b/test/box/ddl.result
@@ -19,14 +19,14 @@ test_run:cmd("setopt delimiter ';'")
 - true
 ...
 function f1()
-  box.space.test:create_index('sec', {parts = {2, 'num'}})
-  ch:put(true)
+    box.space.test:create_index('sec', {parts = {2, 'num'}})
+    ch:put(true)
 end;
 ---
 ...
 function f2()
-  box.space.test:create_index('third', {parts = {3, 'string'}})
-  ch:put(true)
+    box.space.test:create_index('third', {parts = {3, 'string'}})
+    ch:put(true)
 end;
 ---
 ...
@@ -115,3 +115,51 @@ space:select()
 space:drop()
 ---
 ...
+ch = fiber.channel(3)
+---
+...
+_ = box.schema.space.create('test'):create_index('pk')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function add_index()
+    box.space.test:create_index('sec', {parts = {2, 'num'}})
+    ch:put(true)
+end;
+---
+...
+function insert_tuple(tuple)
+    ch:put({pcall(box.space.test.replace, box.space.test, tuple)})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = {fiber.create(insert_tuple, {1, 2, 'a'}), fiber.create(add_index), fiber.create(insert_tuple, {2, '3', 'b'})}
+---
+...
+{ch:get(), ch:get(), ch:get()}
+---
+- - - false
+    - 'Tuple field 2 type does not match one required by operation: expected unsigned'
+  - - true
+    - [1, 2, 'a']
+  - true
+...
+box.space.test:select()
+---
+- - [1, 2, 'a']
+...
+test_run:cmd('restart server default')
+box.space.test:select()
+---
+- - [1, 2, 'a']
+...
+box.space.test:drop()
+---
+...
diff --git a/test/box/ddl.test.lua b/test/box/ddl.test.lua
index bddc358d08..610be07624 100644
--- a/test/box/ddl.test.lua
+++ b/test/box/ddl.test.lua
@@ -11,13 +11,13 @@ ch = fiber.channel(2)
 test_run:cmd("setopt delimiter ';'")
 
 function f1()
-  box.space.test:create_index('sec', {parts = {2, 'num'}})
-  ch:put(true)
+    box.space.test:create_index('sec', {parts = {2, 'num'}})
+    ch:put(true)
 end;
 
 function f2()
-  box.space.test:create_index('third', {parts = {3, 'string'}})
-  ch:put(true)
+    box.space.test:create_index('third', {parts = {3, 'string'}})
+    ch:put(true)
 end;
 
 test_run:cmd("setopt delimiter ''");
@@ -60,3 +60,28 @@ space:select()
 space:drop()
 
 
+ch = fiber.channel(3)
+
+_ = box.schema.space.create('test'):create_index('pk')
+
+test_run:cmd("setopt delimiter ';'")
+function add_index()
+    box.space.test:create_index('sec', {parts = {2, 'num'}})
+    ch:put(true)
+end;
+
+function insert_tuple(tuple)
+    ch:put({pcall(box.space.test.replace, box.space.test, tuple)})
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = {fiber.create(insert_tuple, {1, 2, 'a'}), fiber.create(add_index), fiber.create(insert_tuple, {2, '3', 'b'})}
+{ch:get(), ch:get(), ch:get()}
+
+box.space.test:select()
+
+test_run:cmd('restart server default')
+
+box.space.test:select()
+box.space.test:drop()
+
-- 
GitLab