From deea069324f141c6ef4f80b58ba828a0aef9ce91 Mon Sep 17 00:00:00 2001
From: Nikita Pettik <korablev@tarantool.org>
Date: Fri, 13 Jul 2018 02:15:41 +0300
Subject: [PATCH] sql: display error on FK creation and drop failure

Before insertion to _fk_constraint we must be sure that there is no
entry with given <name, child id>. Otherwise, insertion will fail and
'duplicate key' will be shown. Such error message doesn't seem to be
informative enough, so lets verify before insertion to _fk_constraint
that it doesn't already contain entry with given name.
The same is for dropping constraint, but here vice versa: we test
that _fk_constraint contains entry with given name and child id.

It is worth mentioning that during CREATE TABLE processing schema id
changes and check in OP_OpenRead opcode fails (which in turn shows that
pointer to space may expire). On the other hand, _fk_constraint space
itself remains immutable, so as a temporary workaround lets use flag
indicating pointer to system space passed to OP_OpenRead. It makes
possible to use pointer to space, even if schema has changed.

Closes #3271
---
 src/box/errcode.h            |  2 ++
 src/box/sql/build.c          | 52 ++++++++++++++++++++++++------------
 src/box/sql/sqliteInt.h      | 10 ++++---
 src/box/sql/trigger.c        | 15 +++++++++--
 src/box/sql/vdbe.c           |  3 ++-
 test/box/misc.result         |  2 ++
 test/sql-tap/alter2.test.lua | 25 ++++++++++++++++-
 7 files changed, 85 insertions(+), 24 deletions(-)

diff --git a/src/box/errcode.h b/src/box/errcode.h
index 213a1864bb..3000f76da2 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -221,6 +221,8 @@ struct errcode_record {
 	/*166 */_(ER_NO_SUCH_COLLATION,		"Collation '%s' does not exist") \
 	/*167 */_(ER_CREATE_FK_CONSTRAINT,	"Failed to create foreign key constraint '%s': %s") \
 	/*168 */_(ER_DROP_FK_CONSTRAINT,	"Failed to drop foreign key constraint '%s': %s") \
+	/*169 */_(ER_NO_SUCH_CONSTRAINT,	"Constraint %s does not exist") \
+	/*170 */_(ER_CONSTRAINT_EXISTS,		"Constraint %s already exists") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index cb232b12db..cdf2bfcc9c 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -1580,6 +1580,20 @@ vdbe_emit_fkey_create(struct Parse *parse_context, const struct fkey_def *fk)
 		sqlite3VdbeAddOp2(vdbe, OP_Integer, fk->parent_id,
 				  constr_tuple_reg + 2);
 	}
+	/*
+	 * Lets check that constraint with this name hasn't
+	 * been created before.
+	 */
+	const char *error_msg =
+		tt_sprintf(tnt_errcode_desc(ER_CONSTRAINT_EXISTS), name_copy);
+	if (vdbe_emit_halt_with_presence_test(parse_context,
+					      BOX_FK_CONSTRAINT_ID, 0,
+					      constr_tuple_reg, 2,
+					      ER_CONSTRAINT_EXISTS, error_msg,
+					      false, OP_NoConflict) != 0) {
+		sqlite3DbFree(parse_context->db, name_copy);
+		return;
+	}
 	sqlite3VdbeAddOp2(vdbe, OP_Bool, 0, constr_tuple_reg + 3);
 	sqlite3VdbeChangeP4(vdbe, -1, (char*)&fk->is_deferred, P4_BOOL);
 	sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 4, 0,
@@ -2028,7 +2042,7 @@ sql_clear_stat_spaces(struct Parse *parse, const char *table_name,
  * @param child_id Id of table which constraint belongs to.
  */
 static void
-vdbe_emit_fkey_drop(struct Parse *parse_context, const char *constraint_name,
+vdbe_emit_fkey_drop(struct Parse *parse_context, char *constraint_name,
 		    uint32_t child_id)
 {
 	struct Vdbe *vdbe = sqlite3GetVdbe(parse_context);
@@ -2037,6 +2051,17 @@ vdbe_emit_fkey_drop(struct Parse *parse_context, const char *constraint_name,
 	sqlite3VdbeAddOp4(vdbe, OP_String8, 0, key_reg, 0, constraint_name,
 			  P4_DYNAMIC);
 	sqlite3VdbeAddOp2(vdbe, OP_Integer, child_id,  key_reg + 1);
+	const char *error_msg =
+		tt_sprintf(tnt_errcode_desc(ER_NO_SUCH_CONSTRAINT),
+			   constraint_name);
+	if (vdbe_emit_halt_with_presence_test(parse_context,
+					      BOX_FK_CONSTRAINT_ID, 0,
+					      key_reg, 2, ER_NO_SUCH_CONSTRAINT,
+					      error_msg, false,
+					      OP_Found) != 0) {
+		sqlite3DbFree(parse_context->db, constraint_name);
+		return;
+	}
 	sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, key_reg, 2, key_reg + 2);
 	sqlite3VdbeAddOp2(vdbe, OP_SDelete, BOX_FK_CONSTRAINT_ID, key_reg + 2);
 	sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
@@ -2102,8 +2127,7 @@ sql_code_drop_table(struct Parse *parse_context, struct space *space,
 	/* Delete all child FK constraints. */
 	struct fkey *child_fk;
 	rlist_foreach_entry (child_fk, &space->child_fkey, child_link) {
-		const char *fk_name_dup = sqlite3DbStrDup(v->db,
-							  child_fk->def->name);
+		char *fk_name_dup = sqlite3DbStrDup(v->db, child_fk->def->name);
 		if (fk_name_dup == NULL)
 			return;
 		vdbe_emit_fkey_drop(parse_context, fk_name_dup, space_id);
@@ -2503,8 +2527,8 @@ sql_drop_foreign_key(struct Parse *parse_context, struct SrcList *table,
 		parse_context->nErr++;
 		return;
 	}
-	const char *constraint_name = sqlite3NameFromToken(parse_context->db,
-							   constraint);
+	char *constraint_name = sqlite3NameFromToken(parse_context->db,
+						     constraint);
 	if (constraint_name != NULL)
 		vdbe_emit_fkey_drop(parse_context, constraint_name, child_id);
 }
@@ -4004,7 +4028,7 @@ sqlite3WithDelete(sqlite3 * db, With * pWith)
 
 int
 vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
-				  int index_id, const char *name_src,
+				  int index_id, int key_reg, uint32_t key_len,
 				  int tarantool_error_code,
 				  const char *error_src, bool no_error,
 				  int cond_opcode)
@@ -4014,22 +4038,16 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
 	assert(v != NULL);
 
 	struct sqlite3 *db = parser->db;
-	char *name = sqlite3DbStrDup(db, name_src);
-	if (name == NULL)
-		return -1;
 	char *error = sqlite3DbStrDup(db, error_src);
-	if (error == NULL) {
-		sqlite3DbFree(db, name);
+	if (error == NULL)
 		return -1;
-	}
 
 	int cursor = parser->nTab++;
 	vdbe_emit_open_cursor(parser, cursor, index_id, space_by_id(space_id));
-
-	int name_reg = ++parser->nMem;
-	int label = sqlite3VdbeAddOp4(v, OP_String8, 0, name_reg, 0, name,
-				      P4_DYNAMIC);
-	sqlite3VdbeAddOp4Int(v, cond_opcode, cursor, label + 3, name_reg, 1);
+	sqlite3VdbeChangeP5(v, OPFLAG_SYSTEMSP);
+	int label = sqlite3VdbeCurrentAddr(v);
+	sqlite3VdbeAddOp4Int(v, cond_opcode, cursor, label + 3, key_reg,
+			     key_len);
 	if (no_error) {
 		sqlite3VdbeAddOp0(v, OP_Halt);
 	} else {
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 659c825e9c..189d161503 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2943,6 +2943,9 @@ struct Parse {
 #define OPFLAG_NOOP_IF_NULL  0x02	/* OP_FCopy: if source register is NULL
 					 * then do nothing
 					 */
+#define OPFLAG_SYSTEMSP      0x20	/* OP_Open**: set if space pointer
+					 * points to system space.
+					 */
 
 /*
  * Each trigger present in the database schema is stored as an
@@ -4863,7 +4866,7 @@ table_column_nullable_action(struct Table *tab, uint32_t column);
 
 /**
  * Generate VDBE code to halt execution with correct error if
- * the object with specified name is already present (or doesn't
+ * the object with specified key is already present (or doesn't
  * present - configure with cond_opcodeq) in specified space.
  * The function allocates error and name resources for VDBE
  * itself.
@@ -4871,7 +4874,8 @@ table_column_nullable_action(struct Table *tab, uint32_t column);
  * @param parser Parsing context.
  * @param space_id Space to lookup identifier.
  * @param index_id Index identifier of key.
- * @param name_src Name of object to test on existence.
+ * @param key_reg Register where key to be found is held.
+ * @param key_len Lenght of key (number of resiters).
  * @param tarantool_error_code to set on halt.
  * @param error_src Error message to display on VDBE halt.
  * @param no_error Do not raise error flag.
@@ -4883,7 +4887,7 @@ table_column_nullable_action(struct Table *tab, uint32_t column);
  */
 int
 vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
-				  int index_id, const char *name_src,
+				  int index_id, int key_reg, uint32_t key_len,
 				  int tarantool_error_code,
 				  const char *error_src, bool no_error,
 				  int cond_opcode);
diff --git a/src/box/sql/trigger.c b/src/box/sql/trigger.c
index a7b84c39ff..bd730c401f 100644
--- a/src/box/sql/trigger.c
+++ b/src/box/sql/trigger.c
@@ -114,8 +114,14 @@ sql_trigger_begin(struct Parse *parse, struct Token *name, int tr_tm,
 		const char *error_msg =
 			tt_sprintf(tnt_errcode_desc(ER_TRIGGER_EXISTS),
 				   trigger_name);
+		char *name_copy = sqlite3DbStrDup(db, trigger_name);
+		if (name_copy == NULL)
+			goto trigger_cleanup;
+		int name_reg = ++parse->nMem;
+		sqlite3VdbeAddOp4(parse->pVdbe, OP_String8, 0, name_reg, 0,
+				  name_copy, P4_DYNAMIC);
 		if (vdbe_emit_halt_with_presence_test(parse, BOX_TRIGGER_ID, 0,
-						      trigger_name,
+						      name_reg, 1,
 						      ER_TRIGGER_EXISTS,
 						      error_msg, (no_err != 0),
 						      OP_NoConflict) != 0)
@@ -458,8 +464,13 @@ sql_drop_trigger(struct Parse *parser, struct SrcList *name, bool no_err)
 	const char *error_msg =
 		tt_sprintf(tnt_errcode_desc(ER_NO_SUCH_TRIGGER),
 			   trigger_name);
+	char *name_copy = sqlite3DbStrDup(db, trigger_name);
+	if (name_copy == NULL)
+		goto drop_trigger_cleanup;
+	int name_reg = ++parser->nMem;
+	sqlite3VdbeAddOp4(v, OP_String8, 0, name_reg, 0, name_copy, P4_DYNAMIC);
 	if (vdbe_emit_halt_with_presence_test(parser, BOX_TRIGGER_ID, 0,
-					      trigger_name, ER_NO_SUCH_TRIGGER,
+					      name_reg, 1, ER_NO_SUCH_TRIGGER,
 					      error_msg, no_err, OP_Found) != 0)
 		goto drop_trigger_cleanup;
 
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 6449d5c0ce..78900c773d 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -3092,7 +3092,8 @@ case OP_OpenRead:
 case OP_OpenWrite:
 
 	assert(pOp->opcode==OP_OpenWrite || pOp->p5==0 || pOp->p5==OPFLAG_SEEKEQ);
-	if (box_schema_version() != p->schema_ver) {
+	if (box_schema_version() != p->schema_ver &&
+	    (pOp->p5 & OPFLAG_SYSTEMSP) == 0) {
 		p->expired = 1;
 		rc = SQLITE_ERROR;
 		sqlite3VdbeError(p, "schema version has changed: " \
diff --git a/test/box/misc.result b/test/box/misc.result
index a680f752e6..6e1bd70138 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -493,6 +493,8 @@ t;
   166: box.error.NO_SUCH_COLLATION
   167: box.error.CREATE_FK_CONSTRAINT
   168: box.error.DROP_FK_CONSTRAINT
+  169: box.error.NO_SUCH_CONSTRAINT
+  170: box.error.CONSTRAINT_EXISTS
 ...
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/sql-tap/alter2.test.lua b/test/sql-tap/alter2.test.lua
index 94a61ebf46..f231c602a1 100755
--- a/test/sql-tap/alter2.test.lua
+++ b/test/sql-tap/alter2.test.lua
@@ -1,6 +1,6 @@
 #!/usr/bin/env tarantool
 test = require("sqltester")
-test:plan(18)
+test:plan(20)
 
 -- This suite is aimed to test ALTER TABLE ADD CONSTRAINT statement.
 --
@@ -226,4 +226,27 @@ test:do_catchsql_test(
         -- </alter2-4.2>
     })
 
+test:do_catchsql_test(
+    "alter2-5.1",
+    [[
+        DROP TABLE child;
+        CREATE TABLE child (id PRIMARY KEY, a UNIQUE);
+        ALTER TABLE child ADD CONSTRAINT fk FOREIGN KEY (id) REFERENCES child;
+        ALTER TABLE child ADD CONSTRAINT fk FOREIGN KEY (a) REFERENCES child;
+    ]], {
+        -- <alter2-5.1>
+        1, "Constraint FK already exists"
+        -- </alter2-5.1>
+    })
+
+test:do_catchsql_test(
+    "alter2-5.2",
+    [[
+        ALTER TABLE child DROP CONSTRAINT fake;
+    ]], {
+        -- <alter2-5.2>
+        1, "Constraint FAKE does not exist"
+        -- </alter2-5.2>
+    })
+
 test:finish_test()
-- 
GitLab