From f72efbc24c50a0b9784188e8ea680692f8f3daaa Mon Sep 17 00:00:00 2001
From: Andrey Saranchin <Andrey22102001@gmail.com>
Date: Thu, 15 Dec 2022 14:38:04 +0300
Subject: [PATCH] core: drop constraints in two phases

Currently, core constraints are dropped on commit. That is why
it is impossible to drop constraint and drop objects it references to
at the same transaction. Let's drop constraints in two steps - detach
them when DDL occurs, then reattach on rollback or delete on commit.

Closes #7339

NO_DOC=bugfix
---
 .../unreleased/gh_7339_fk_drop_in_one_txn.md  |   4 +
 src/box/alter.cc                              |   9 ++
 src/box/space.c                               |  40 ++++++
 src/box/space.h                               |  12 ++
 src/box/tuple_constraint.c                    |   6 +-
 src/box/tuple_constraint.h                    |  19 ++-
 src/box/tuple_constraint_fkey.c               |  69 ++++++++--
 src/box/tuple_constraint_func.c               |  43 ++++++-
 .../gh_7339_drop_fk_in_same_txn_test.lua      | 120 ++++++++++++++++++
 9 files changed, 295 insertions(+), 27 deletions(-)
 create mode 100644 changelogs/unreleased/gh_7339_fk_drop_in_one_txn.md
 create mode 100644 test/engine-luatest/gh_7339_drop_fk_in_same_txn_test.lua

diff --git a/changelogs/unreleased/gh_7339_fk_drop_in_one_txn.md b/changelogs/unreleased/gh_7339_fk_drop_in_one_txn.md
new file mode 100644
index 0000000000..3d72962c16
--- /dev/null
+++ b/changelogs/unreleased/gh_7339_fk_drop_in_one_txn.md
@@ -0,0 +1,4 @@
+## bugfix/core
+
+* Now referenced space or used function can be dropped in the same
+  transaction with referencing constraint or space (gh-7339).
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 2a8923d4a1..a0e288ce07 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -864,6 +864,7 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
 	 */
 	space_swap_triggers(alter->new_space, alter->old_space);
 	space_swap_constraint_ids(alter->new_space, alter->old_space);
+	space_reattach_constraints(alter->old_space);
 	space_cache_replace(alter->new_space, alter->old_space);
 	alter_space_delete(alter);
 	return 0;
@@ -983,6 +984,7 @@ alter_space_do(struct txn_stmt *stmt, struct alter_space *alter)
 	 * cache with it.
 	 */
 	space_cache_replace(alter->old_space, alter->new_space);
+	space_detach_constraints(alter->old_space);
 	/*
 	 * Install transaction commit/rollback triggers to either
 	 * finish or rollback the DDL depending on the results of
@@ -1685,6 +1687,7 @@ on_drop_space_rollback(struct trigger *trigger, void *event)
 	(void) event;
 	struct space *space = (struct space *)trigger->data;
 	space_cache_replace(NULL, space);
+	space_reattach_constraints(space);
 	return 0;
 }
 
@@ -2092,6 +2095,12 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		/* Check whether old_space is used somewhere. */
 		if (space_check_pinned(old_space) != 0)
 			return -1;
+		/**
+		 * We need to unpin spaces that are referenced by deleted one.
+		 * Let's detach space constraints - they will be deleted
+		 * on commit or reattached on rollback.
+		 */
+		space_detach_constraints(old_space);
 		/**
 		 * The space must be deleted from the space
 		 * cache right away to achieve linearisable
diff --git a/src/box/space.c b/src/box/space.c
index d12f54c530..4cea54cc29 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -177,6 +177,46 @@ space_init_constraints(struct space *space)
 	return 0;
 }
 
+/**
+ * Detach constraints that are defined in @a space format.
+ */
+void
+space_detach_constraints(struct space *space)
+{
+	struct tuple_format *format = space->format;
+	for (size_t j = 0; j < format->constraint_count; j++) {
+		struct tuple_constraint *constr = &format->constraint[j];
+		constr->detach(constr);
+	}
+	for (size_t i = 0; i < tuple_format_field_count(format); i++) {
+		struct tuple_field *field = tuple_format_field(format, i);
+		for (size_t j = 0; j < field->constraint_count; j++) {
+			struct tuple_constraint *constr = &field->constraint[j];
+			constr->detach(constr);
+		}
+	}
+}
+
+/**
+ * Reattach constraints that are defined in @a space format.
+ */
+void
+space_reattach_constraints(struct space *space)
+{
+	struct tuple_format *format = space->format;
+	for (size_t j = 0; j < format->constraint_count; j++) {
+		struct tuple_constraint *constr = &format->constraint[j];
+		constr->reattach(constr);
+	}
+	for (size_t i = 0; i < tuple_format_field_count(format); i++) {
+		struct tuple_field *field = tuple_format_field(format, i);
+		for (size_t j = 0; j < field->constraint_count; j++) {
+			struct tuple_constraint *constr = &field->constraint[j];
+			constr->reattach(constr);
+		}
+	}
+}
+
 /**
  * Destroy constraints that are defined in @a space format.
  */
diff --git a/src/box/space.h b/src/box/space.h
index feca0ca4d1..a676d610a6 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -236,6 +236,18 @@ struct space {
 	struct space_wal_ext *wal_ext;
 };
 
+/**
+ * Detach constraints from space. They can be reattached or deleted then.
+ */
+void
+space_detach_constraints(struct space *space);
+
+/**
+ * Reattach space constraints.
+ */
+void
+space_reattach_constraints(struct space *space);
+
 /** Initialize a base space instance. */
 int
 space_create(struct space *space, struct engine *engine,
diff --git a/src/box/tuple_constraint.c b/src/box/tuple_constraint.c
index 7baa75bcca..3a15edc479 100644
--- a/src/box/tuple_constraint.c
+++ b/src/box/tuple_constraint.c
@@ -21,7 +21,7 @@ tuple_constraint_noop_check(const struct tuple_constraint *constr,
 }
 
 void
-tuple_constraint_noop_destructor(struct tuple_constraint *constr)
+tuple_constraint_noop_alter(struct tuple_constraint *constr)
 {
 	(void)constr;
 }
@@ -58,7 +58,9 @@ tuple_constraint_array_new(const struct tuple_constraint_def *defs,
 	grp_alloc_use(&all, res + count);
 	for (size_t i = 0; i < count; i++) {
 		res[i].check = tuple_constraint_noop_check;
-		res[i].destroy = tuple_constraint_noop_destructor;
+		res[i].destroy = tuple_constraint_noop_alter;
+		res[i].detach = tuple_constraint_noop_alter;
+		res[i].reattach = tuple_constraint_noop_alter;
 		if (defs[i].type != CONSTR_FKEY) {
 			res[i].fkey = NULL;
 			continue;
diff --git a/src/box/tuple_constraint.h b/src/box/tuple_constraint.h
index fbc0e5b06c..4bd86e7fbf 100644
--- a/src/box/tuple_constraint.h
+++ b/src/box/tuple_constraint.h
@@ -32,10 +32,10 @@ typedef int
 		      const struct tuple_field *field);
 
 /**
- * Type of constraint destructor.
+ * Type of constraint alter (destroy, detach, reattach).
  */
 typedef void
-(*tuple_constraint_destroy_f)(struct tuple_constraint *constraint);
+(*tuple_constraint_alter_f)(struct tuple_constraint *constraint);
 
 /**
  * Additional data for each local/foreign field pair in foreign key constraint.
@@ -115,8 +115,15 @@ struct tuple_constraint {
 	struct tuple_constraint_def def;
 	/** The constraint check function. */
 	tuple_constraint_f check;
-	/** Destructor. Expected to be reentrant - it's ok to call it twice.*/
-	tuple_constraint_destroy_f destroy;
+	/** Detach constraint from space, but do not delete it. */
+	tuple_constraint_alter_f detach;
+	/** Reattach constraint to space. */
+	tuple_constraint_alter_f reattach;
+	/**
+	 * Destructor. Expected to be reentrant - it's ok to call it twice.
+	 * Detaches the constraint if it has not beed detached before.
+	 */
+	tuple_constraint_alter_f destroy;
 	/** Space in which the constraint is. */
 	struct space *space;
 	/** Various data for different states of constraint. */
@@ -140,10 +147,10 @@ tuple_constraint_noop_check(const struct tuple_constraint *constraint,
 			    const struct tuple_field *field);
 
 /**
- * No-op destructor of constraint. Used as a default.
+ * No-op alter (destroy, detach, reattach) of constraint. Used as a default.
  */
 void
-tuple_constraint_noop_destructor(struct tuple_constraint *constr);
+tuple_constraint_noop_alter(struct tuple_constraint *constr);
 
 /**
  * Compare two constraint objects, return 0 if they are equal.
diff --git a/src/box/tuple_constraint_fkey.c b/src/box/tuple_constraint_fkey.c
index 69a205f746..03e20e4953 100644
--- a/src/box/tuple_constraint_fkey.c
+++ b/src/box/tuple_constraint_fkey.c
@@ -493,19 +493,6 @@ tuple_constraint_fkey_check_delete(const struct tuple_constraint *constr,
 	return rc;
 }
 
-/**
- * Destructor that unpins space from space_cache.
- */
-static void
-tuple_constraint_fkey_unpin(struct tuple_constraint *constr)
-{
-	assert(constr->destroy == tuple_constraint_fkey_unpin);
-	space_cache_unpin(&constr->space_cache_holder);
-	constr->check = tuple_constraint_noop_check;
-	constr->destroy = tuple_constraint_noop_destructor;
-	constr->space = NULL;
-}
-
 /**
  * Find and set foreign_field_no amd foreign_index fkey member of @a constraint.
  * If something was not found - foreign_index is set to -1.
@@ -580,6 +567,58 @@ tuple_constraint_fkey_space_cache_on_replace(struct space_cache_holder *holder,
 	tuple_constraint_fkey_update_foreign(constr);
 }
 
+/**
+ * Unpin space from space_cache, remove check.
+ */
+static void
+tuple_constraint_fkey_detach(struct tuple_constraint *constr)
+{
+	assert(constr->detach == tuple_constraint_fkey_detach);
+	/* Check that constraint has not been detached yet. */
+	assert(constr->check != tuple_constraint_noop_check);
+	space_cache_unpin(&constr->space_cache_holder);
+	constr->check = tuple_constraint_noop_check;
+}
+
+/**
+ * Put space back to space_cache, put check back.
+ */
+static void
+tuple_constraint_fkey_reattach(struct tuple_constraint *constr)
+{
+	assert(constr->reattach == tuple_constraint_fkey_reattach);
+	/* Check that constraint has been detached. */
+	assert(constr->check == tuple_constraint_noop_check);
+	struct space *space = constr->space;
+	bool fkey_same_space = constr->def.fkey.space_id == 0 ||
+			       constr->def.fkey.space_id == space->def->id;
+	uint32_t space_id = fkey_same_space ? space->def->id :
+			    constr->def.fkey.space_id;
+	struct space *foreign_space = space_by_id(space_id);
+	enum space_cache_holder_type type = SPACE_HOLDER_FOREIGN_KEY;
+	space_cache_pin(foreign_space, &constr->space_cache_holder,
+			tuple_constraint_fkey_space_cache_on_replace,
+			type, fkey_same_space);
+	constr->check = tuple_constraint_fkey_check;
+}
+
+/**
+ * Destructor. Detaches constraint if it has not been detached before and
+ * deinitializes its fields.
+ */
+static void
+tuple_constraint_fkey_destroy(struct tuple_constraint *constr)
+{
+	assert(constr->destroy == tuple_constraint_fkey_destroy);
+	/** Detach constraint if it has not been detached before. */
+	if (constr->check != tuple_constraint_noop_check)
+		tuple_constraint_fkey_detach(constr);
+	constr->detach = tuple_constraint_noop_alter;
+	constr->reattach = tuple_constraint_noop_alter;
+	constr->destroy = tuple_constraint_noop_alter;
+	constr->space = NULL;
+}
+
 int
 tuple_constraint_fkey_init(struct tuple_constraint *constr,
 			   struct space *space, int32_t field_no)
@@ -603,7 +642,9 @@ tuple_constraint_fkey_init(struct tuple_constraint *constr,
 				type, fkey_same_space);
 		tuple_constraint_fkey_update_foreign(constr);
 		constr->check = tuple_constraint_fkey_check;
-		constr->destroy = tuple_constraint_fkey_unpin;
+		constr->destroy = tuple_constraint_fkey_destroy;
+		constr->detach = tuple_constraint_fkey_detach;
+		constr->reattach = tuple_constraint_fkey_reattach;
 		return 0;
 	}
 	if (recovery_state >= FINAL_RECOVERY) {
diff --git a/src/box/tuple_constraint_func.c b/src/box/tuple_constraint_func.c
index afd7a7ba98..405dd1f273 100644
--- a/src/box/tuple_constraint_func.c
+++ b/src/box/tuple_constraint_func.c
@@ -131,15 +131,46 @@ tuple_constraint_call_func(const struct tuple_constraint *constr,
 }
 
 /**
- * Destructor that unpins func from func_cache.
+ * Unpin func from func_cache, removes check.
  */
 static void
-tuple_constraint_func_unpin(struct tuple_constraint *constr)
+tuple_constraint_func_detach(struct tuple_constraint *constr)
 {
-	assert(constr->destroy == tuple_constraint_func_unpin);
+	assert(constr->detach == tuple_constraint_func_detach);
+	/* Check that constraint has not been detached yet. */
+	assert(constr->check != tuple_constraint_noop_check);
 	func_unpin(&constr->func_cache_holder);
 	constr->check = tuple_constraint_noop_check;
-	constr->destroy = tuple_constraint_noop_destructor;
+}
+
+/**
+ * Pin func to func_cache, set check.
+ */
+static void
+tuple_constraint_func_reattach(struct tuple_constraint *constr)
+{
+	assert(constr->reattach == tuple_constraint_func_reattach);
+	/* Check that constraint has been detached. */
+	assert(constr->check == tuple_constraint_noop_check);
+	struct func *func = tuple_constraint_func_find(constr);
+	func_pin(func, &constr->func_cache_holder, FUNC_HOLDER_CONSTRAINT);
+	constr->check = tuple_constraint_call_func;
+}
+
+/**
+ * Destructor. Detaches constraint if it has not been detached before and
+ * deinitializes its fields.
+ */
+static void
+tuple_constraint_func_destroy(struct tuple_constraint *constr)
+{
+	assert(constr->destroy == tuple_constraint_func_destroy);
+	/** Detach constraint if it has not been detached before. */
+	if (constr->check != tuple_constraint_noop_check)
+		tuple_constraint_func_detach(constr);
+	constr->detach = tuple_constraint_noop_alter;
+	constr->reattach = tuple_constraint_noop_alter;
+	constr->destroy = tuple_constraint_noop_alter;
 	constr->space = NULL;
 }
 
@@ -165,6 +196,8 @@ tuple_constraint_func_init(struct tuple_constraint *constr,
 	}
 	func_pin(func, &constr->func_cache_holder, FUNC_HOLDER_CONSTRAINT);
 	constr->check = tuple_constraint_call_func;
-	constr->destroy = tuple_constraint_func_unpin;
+	constr->destroy = tuple_constraint_func_destroy;
+	constr->detach = tuple_constraint_func_detach;
+	constr->reattach = tuple_constraint_func_reattach;
 	return 0;
 }
diff --git a/test/engine-luatest/gh_7339_drop_fk_in_same_txn_test.lua b/test/engine-luatest/gh_7339_drop_fk_in_same_txn_test.lua
new file mode 100644
index 0000000000..7cd861908c
--- /dev/null
+++ b/test/engine-luatest/gh_7339_drop_fk_in_same_txn_test.lua
@@ -0,0 +1,120 @@
+local t = require('luatest')
+local server = require('luatest.server')
+
+local engines = {'memtx', 'vinyl'}
+
+local g = t.group('Drop constr and referenced obj in txn', t.helpers.matrix{
+    engine_a = engines, engine_b = engines,
+    alter = {true, false}
+})
+
+g.before_all(function(cg)
+    cg.server = server:new({alias = 'master'})
+    cg.server:start()
+end)
+
+g.after_all(function(cg)
+    cg.server:drop()
+end)
+
+g.before_each(function(cg)
+    cg.server:exec(function(engine_a, engine_b)
+        box.schema.func.create('ck1', {
+            is_deterministic = true,
+            body = "function(x) return x[1] > 5 end"
+        })
+        box.schema.func.create('ck2', {
+            is_deterministic = true,
+            body = "function(x) return x < 20 end"
+        })
+        local a = box.schema.space.create('a', {
+            format = {
+                {name = 'i', type = 'integer'},
+                {name = 'j', type = 'integer'}
+            },
+            engine = engine_a
+        })
+        a:create_index('pk', {parts = {{1}}})
+        a:create_index('sk', {parts = {{2}}})
+        local b = box.schema.space.create('b', {
+            constraint = "ck1",
+            foreign_key = {tup_fk = {space = 'a', field = {[2] = 2}}},
+            format = {
+                {
+                    name = 'i', type = 'integer',
+                    foreign_key = {space = 'a', field = 'i'},
+                    constraint = "ck2"
+                },
+                {name = 'j', type = 'integer'}
+            },
+            engine = engine_b
+        })
+        b:create_index('pk', {parts = {{1}}})
+        b:create_index('sk', {parts = {{2}}})
+    end, {cg.params.engine_a, cg.params.engine_b})
+end)
+
+g.after_each(function(cg)
+    cg.server:exec(function()
+        if box.space.a ~= nil then box.space.a:drop() end
+        if box.space.b ~= nil then box.space.b:drop() end
+        if box.func.ck1 ~= nil then box.func.ck1:drop() end
+    end)
+end)
+
+g.test_constraints_alter_in_txn = function(cg)
+    cg.server:exec(function(alter)
+        local t = require('luatest')
+        local a = box.space.a
+        local b = box.space.b
+        box.begin()
+        if alter then
+            b:alter({
+                foreign_key = {},
+                constraint = {},
+                format = {
+                    {name = 'i', type = 'integer'},
+                    {name = 'j', type = 'integer', is_nullable = true}
+                }
+            })
+        else
+            b:drop()
+        end
+        a:drop()
+        box.func.ck1:drop()
+        box.rollback()
+        a:insert{100, 100}
+        -- Will fail with error
+        local msg = "Foreign key constraint 'a' failed for field '1 (i)': " ..
+            "foreign tuple was not found"
+        t.assert_error_msg_content_equals(msg, b.insert, b, {10, 100})
+        a:insert{10, 200}
+        msg = "Foreign key constraint 'tup_fk' failed: " ..
+            "foreign tuple was not found"
+        t.assert_error_msg_content_equals(msg, b.insert, b, {10, 10})
+        a:replace{10, 10}
+        b:replace{10, 10}
+        a:insert{0, 0}
+        msg = "Check constraint 'ck1' failed for tuple"
+        t.assert_error_msg_content_equals(msg, b.insert, b, {0, 0})
+        a:insert{30, 30}
+        msg = "Check constraint 'ck2' failed for field '1 (i)'"
+        t.assert_error_msg_content_equals(msg, b.insert, b, {30, 30})
+        box.begin()
+        if alter then
+            b:alter({
+                foreign_key = {},
+                constraint = {},
+                format = {
+                    {name = 'i', type = 'integer'},
+                    {name = 'j', type = 'integer', is_nullable = true}
+                }
+            })
+        else
+            b:drop()
+        end
+        a:drop()
+        box.func.ck1:drop()
+        box.commit()
+    end, {cg.params.alter})
+end
-- 
GitLab