From c4781f9394e3d1e0878b1d726c6dd4f9db54ca11 Mon Sep 17 00:00:00 2001
From: Kirill Shcherbatov <kshcherbatov@tarantool.org>
Date: Mon, 16 Sep 2019 13:20:34 +0300
Subject: [PATCH] sql: add an ability to disable CK constraints

Closes #4244

@TarantoolBot document
Title: an ability to disable CK constraints

Now it is possible to disable and enable ck constraints.
All ck constraints are enabled by default when Tarantool is
configured. Ck constraints checks are not performed during
standard recovery, but performed during force_recovery -
all conflicting tuples are skipped in case of ck_constraint
conflict.

To change CK constraint "is_enabled" state, call
-- in LUA
ck_obj:enable(new_state in {true, false})
-- in SQL
ALTER TABLE {TABLE_NAME} {EN, DIS}ABLE CHECK CONSTRAINT {CK_NAME};

Example:
box.space.T6.ck_constraint.ck_unnamed_T6_1:enable(false)
box.space.T6.ck_constraint.ck_unnamed_T6_1
- space_id: 512
  is_enabled: false
  name: ck_unnamed_T6_1
  expr: a < 10
box.space.T6:insert({11})
-- passed
box.execute("ALTER TABLE t6 ENABLE CHECK CONSTRAINT \"ck_unnamed_T6_1\"")
box.space.T6:insert({12})
- error: 'Check constraint failed ''ck_unnamed_T6_1'': a < 10'
---
 extra/mkkeywordhash.c          |  2 ++
 src/box/sql/alter.c            | 62 ++++++++++++++++++++++++++++++++++
 src/box/sql/parse.y            | 12 ++++++-
 src/box/sql/parse_def.h        | 22 +++++++++++-
 src/box/sql/sqlInt.h           | 10 ++++++
 test/sql-tap/keyword1.test.lua |  4 ++-
 test/sql/checks.result         | 49 +++++++++++++++++++++++++++
 test/sql/checks.test.lua       | 15 ++++++++
 8 files changed, 173 insertions(+), 3 deletions(-)

diff --git a/extra/mkkeywordhash.c b/extra/mkkeywordhash.c
index 2a59d6a613..3c3de4cca0 100644
--- a/extra/mkkeywordhash.c
+++ b/extra/mkkeywordhash.c
@@ -89,6 +89,7 @@ static Keyword aKeywordTable[] = {
   { "DEFERRED",               "TK_DEFERRED",    false },
   { "DEFERRABLE",             "TK_DEFERRABLE",  false },
   { "DELETE",                 "TK_DELETE",      true  },
+  { "DISABLE",                "TK_DISABLE",     false },
   { "DESC",                   "TK_DESC",        true  },
   { "DISTINCT",               "TK_DISTINCT",    true  },
   { "DROP",                   "TK_DROP",        true  },
@@ -203,6 +204,7 @@ static Keyword aKeywordTable[] = {
   { "DETERMINISTIC",          "TK_STANDARD",    true  },
   { "DOUBLE",                 "TK_STANDARD",    true  },
   { "ELSEIF",                 "TK_STANDARD",    true  },
+  { "ENABLE",                 "TK_ENABLE",      false },
   { "FETCH",                  "TK_STANDARD",    true  },
   { "FLOAT",                  "TK_STANDARD",    true  },
   { "FUNCTION",               "TK_STANDARD",    true  },
diff --git a/src/box/sql/alter.c b/src/box/sql/alter.c
index 7656001866..973b420cf5 100644
--- a/src/box/sql/alter.c
+++ b/src/box/sql/alter.c
@@ -79,6 +79,68 @@ sql_alter_table_rename(struct Parse *parse)
 	goto exit_rename_table;
 }
 
+void
+sql_alter_ck_constraint_enable(struct Parse *parse)
+{
+	struct enable_entity_def *enable_def = &parse->enable_entity_def;
+	struct SrcList *src_tab = enable_def->base.entity_name;
+	assert(enable_def->base.entity_type == ENTITY_TYPE_CK);
+	assert(enable_def->base.alter_action == ALTER_ACTION_ENABLE);
+	assert(src_tab->nSrc == 1);
+	struct sql *db = parse->db;
+
+	char *constraint_name = NULL;
+	const char *tbl_name = src_tab->a[0].zName;
+	struct space *space = space_by_name(tbl_name);
+	if (space == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_SPACE, tbl_name);
+		parse->is_aborted = true;
+		goto exit_alter_ck_constraint;
+	}
+
+	constraint_name = sql_name_from_token(db, &enable_def->name);
+	if (constraint_name == NULL) {
+		parse->is_aborted = true;
+		goto exit_alter_ck_constraint;
+	}
+
+	struct Vdbe *v = sqlGetVdbe(parse);
+	if (v == NULL)
+		goto exit_alter_ck_constraint;
+
+	struct space *ck_space = space_by_id(BOX_CK_CONSTRAINT_ID);
+	assert(ck_space != NULL);
+	int cursor = parse->nTab++;
+	vdbe_emit_open_cursor(parse, cursor, 0, ck_space);
+	sqlVdbeChangeP5(v, OPFLAG_SYSTEMSP);
+
+	int key_reg = sqlGetTempRange(parse, 2);
+	sqlVdbeAddOp2(v, OP_Integer, space->def->id, key_reg);
+	sqlVdbeAddOp4(v, OP_String8, 0, key_reg + 1, 0,
+		      sqlDbStrDup(db, constraint_name), P4_DYNAMIC);
+	int addr = sqlVdbeAddOp4Int(v, OP_Found, cursor, 0, key_reg, 2);
+	sqlVdbeAddOp4(v, OP_SetDiag, ER_NO_SUCH_CONSTRAINT, 0, 0,
+		      sqlMPrintf(db, tnt_errcode_desc(ER_NO_SUCH_CONSTRAINT),
+				 constraint_name), P4_DYNAMIC);
+	sqlVdbeAddOp2(v, OP_Halt, -1, ON_CONFLICT_ACTION_ABORT);
+	sqlVdbeJumpHere(v, addr);
+
+	const int field_count = 6;
+	int tuple_reg = sqlGetTempRange(parse, field_count + 1);
+	for (int i = 0; i < field_count - 1; ++i)
+		sqlVdbeAddOp3(v, OP_Column, cursor, i, tuple_reg + i);
+	sqlVdbeAddOp1(v, OP_Close, cursor);
+	sqlVdbeAddOp2(v, OP_Bool, enable_def->is_enabled,
+		      tuple_reg + field_count - 1);
+	sqlVdbeAddOp3(v, OP_MakeRecord, tuple_reg, field_count,
+		      tuple_reg + field_count);
+	sqlVdbeAddOp4(v, OP_IdxReplace, tuple_reg + field_count, 0, 0,
+		      (char *)ck_space, P4_SPACEPTR);
+exit_alter_ck_constraint:
+	sqlDbFree(db, constraint_name);
+	sqlSrcListDelete(db, src_tab);
+}
+
 /* This function is used to implement the ALTER TABLE command.
  * The table name in the CREATE TRIGGER statement is replaced with the third
  * argument and the result returned. This is analagous to rename_table()
diff --git a/src/box/sql/parse.y b/src/box/sql/parse.y
index ed59a875a4..1d0c95fac5 100644
--- a/src/box/sql/parse.y
+++ b/src/box/sql/parse.y
@@ -260,7 +260,7 @@ columnname(A) ::= nm(A) typedef(Y). {sqlAddColumn(pParse,&A,&Y);}
   CONFLICT DEFERRED END ENGINE FAIL
   IGNORE INITIALLY INSTEAD NO MATCH PLAN
   QUERY KEY OFFSET RAISE RELEASE REPLACE RESTRICT
-  RENAME CTIME_KW IF
+  RENAME CTIME_KW IF ENABLE DISABLE
   .
 %wildcard ANY.
 
@@ -798,6 +798,10 @@ col_list_with_autoinc(A) ::= expr(Y) autoinc(I). {
   A = sql_expr_list_append(pParse->db, NULL, Y.pExpr);
 }
 
+%type enable {bool}
+enable(A) ::= ENABLE.           {A = true;}
+enable(A) ::= DISABLE.          {A = false;}
+
 %type sortorder {int}
 
 sortorder(A) ::= ASC.           {A = SORT_ORDER_ASC;}
@@ -1785,6 +1789,12 @@ cmd ::= ALTER TABLE fullname(X) DROP CONSTRAINT nm(Z). {
   sql_drop_foreign_key(pParse);
 }
 
+cmd ::= alter_table_start(A) enable(E) CHECK CONSTRAINT nm(Z). {
+    enable_entity_def_init(&pParse->enable_entity_def, ENTITY_TYPE_CK, A,
+                           &Z, E);
+    sql_alter_ck_constraint_enable(pParse);
+}
+
 //////////////////////// COMMON TABLE EXPRESSIONS ////////////////////////////
 %type with {With*}
 %type wqlist {With*}
diff --git a/src/box/sql/parse_def.h b/src/box/sql/parse_def.h
index df1238b9e2..2f433e4c05 100644
--- a/src/box/sql/parse_def.h
+++ b/src/box/sql/parse_def.h
@@ -169,7 +169,8 @@ enum entity_type {
 enum alter_action {
 	ALTER_ACTION_CREATE = 0,
 	ALTER_ACTION_DROP,
-	ALTER_ACTION_RENAME
+	ALTER_ACTION_RENAME,
+	ALTER_ACTION_ENABLE,
 };
 
 struct alter_entity_def {
@@ -181,6 +182,14 @@ struct alter_entity_def {
 	struct SrcList *entity_name;
 };
 
+struct enable_entity_def {
+	struct alter_entity_def base;
+	/** Name of constraint to be enabled/disabled. */
+	struct Token name;
+	/** A new state to be set for entity found. */
+	bool is_enabled;
+};
+
 struct rename_entity_def {
 	struct alter_entity_def base;
 	struct Token new_name;
@@ -327,6 +336,17 @@ rename_entity_def_init(struct rename_entity_def *rename_def,
 	rename_def->new_name = *new_name;
 }
 
+static inline void
+enable_entity_def_init(struct enable_entity_def *enable_def,
+		       enum entity_type type, struct SrcList *parent_name,
+		       struct Token *name, bool is_enabled)
+{
+	alter_entity_def_init(&enable_def->base, parent_name, type,
+			      ALTER_ACTION_ENABLE);
+	enable_def->name = *name;
+	enable_def->is_enabled = is_enabled;
+}
+
 static inline void
 create_entity_def_init(struct create_entity_def *create_def,
 		       enum entity_type type, struct SrcList *parent_name,
diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h
index 1b6d92cb18..c32cacc046 100644
--- a/src/box/sql/sqlInt.h
+++ b/src/box/sql/sqlInt.h
@@ -2202,6 +2202,7 @@ struct Parse {
 		struct drop_table_def drop_table_def;
 		struct drop_trigger_def drop_trigger_def;
 		struct drop_view_def drop_view_def;
+		struct enable_entity_def enable_entity_def;
 	};
 	/**
 	 * Table def is not part of union since information
@@ -3969,6 +3970,15 @@ extern int sqlPendingByte;
 void
 sql_alter_table_rename(struct Parse *parse);
 
+/**
+ * Generate code to implement the "ALTER TABLE xxx ENABLE/DISABLE
+ * CHECK CONSTRAINT" command.
+ *
+ * @param parse Current parsing context.
+ */
+void
+sql_alter_ck_constraint_enable(struct Parse *parse);
+
 /**
  * Return the length (in bytes) of the token that begins at z[0].
  * Store the token type in *type before returning.
diff --git a/test/sql-tap/keyword1.test.lua b/test/sql-tap/keyword1.test.lua
index 03b1054d25..571e1dcbe7 100755
--- a/test/sql-tap/keyword1.test.lua
+++ b/test/sql-tap/keyword1.test.lua
@@ -1,6 +1,6 @@
 #!/usr/bin/env tarantool
 test = require("sqltester")
-test:plan(180)
+test:plan(184)
 
 --!./tcltestrunner.lua
 -- 2009 January 29
@@ -44,6 +44,8 @@ local kwlist = {
 	"query",
 	"restrict",
 	"raise",
+	"enable",
+	"disable"
 }
 
 local bannedkws = { 
diff --git a/test/sql/checks.result b/test/sql/checks.result
index 72b1fa26f7..6d584c3013 100644
--- a/test/sql/checks.result
+++ b/test/sql/checks.result
@@ -790,6 +790,55 @@ box.space.TEST:insert({13})
 box.space.TEST:drop()
 ---
 ...
+--
+-- Test ENABLE/DISABLE CK constraints from SQL works.
+--
+box.execute("ALTER TABLE falsch DISABLE CHECK CONSTRAINT \"some_ck\"")
+---
+- null
+- Space 'FALSCH' does not exist
+...
+box.execute("CREATE TABLE test(a INT PRIMARY KEY);");
+---
+- row_count: 1
+...
+box.execute('ALTER TABLE test ADD CONSTRAINT \"some_ck\" CHECK(a < 10);')
+---
+- row_count: 1
+...
+box.execute("ALTER TABLE test DISABLE CHECK CONSTRAINT \"falsch\"")
+---
+- null
+- Constraint falsch does not exist
+...
+box.execute("ALTER TABLE test DISABLE CHECK CONSTRAINT \"some_ck\"")
+---
+- row_count: 0
+...
+assert(box.space.TEST.ck_constraint.some_ck.is_enabled == false)
+---
+- true
+...
+box.space.TEST:insert({11})
+---
+- [11]
+...
+box.execute("ALTER TABLE test ENABLE CHECK CONSTRAINT \"some_ck\"")
+---
+- row_count: 0
+...
+assert(box.space.TEST.ck_constraint.some_ck.is_enabled == true)
+---
+- true
+...
+box.space.TEST:insert({12})
+---
+- error: 'Check constraint failed ''some_ck'': a < 10'
+...
+box.execute("DROP TABLE test;")
+---
+- row_count: 1
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/sql/checks.test.lua b/test/sql/checks.test.lua
index 11918dcedd..f8dc5d3638 100644
--- a/test/sql/checks.test.lua
+++ b/test/sql/checks.test.lua
@@ -257,4 +257,19 @@ assert(box.space.TEST.ck_constraint.CK.is_enabled == true)
 box.space.TEST:insert({13})
 box.space.TEST:drop()
 
+--
+-- Test ENABLE/DISABLE CK constraints from SQL works.
+--
+box.execute("ALTER TABLE falsch DISABLE CHECK CONSTRAINT \"some_ck\"")
+box.execute("CREATE TABLE test(a INT PRIMARY KEY);");
+box.execute('ALTER TABLE test ADD CONSTRAINT \"some_ck\" CHECK(a < 10);')
+box.execute("ALTER TABLE test DISABLE CHECK CONSTRAINT \"falsch\"")
+box.execute("ALTER TABLE test DISABLE CHECK CONSTRAINT \"some_ck\"")
+assert(box.space.TEST.ck_constraint.some_ck.is_enabled == false)
+box.space.TEST:insert({11})
+box.execute("ALTER TABLE test ENABLE CHECK CONSTRAINT \"some_ck\"")
+assert(box.space.TEST.ck_constraint.some_ck.is_enabled == true)
+box.space.TEST:insert({12})
+box.execute("DROP TABLE test;")
+
 test_run:cmd("clear filter")
-- 
GitLab