From 77d73e72b455b0f8a22460c79d1f5e54fc45ca6c Mon Sep 17 00:00:00 2001
From: Mergen Imeev <imeevma@gmail.com>
Date: Fri, 31 Aug 2018 15:50:17 +0300
Subject: [PATCH] sql: clean-up on failed CREATE TABLE

In case statement "CREATE TABLE ..." fails it can left some
records in system spaces that shouldn't be there. These records
won't be left behind after this patch.

@TarantoolBot document
Title: Clean up after failure of CREATE TABLE
Usually CREATE TABLE creates no less than two objects which are
space and index. If creation of index (or any object after space)
failed, created space (and other created objects) won't be deleted
though operation failed. Now these objects will be deleted
properly.

Closes #3592
---
 src/box/sql/build.c          | 101 +++++++++++++++++++++--
 src/box/sql/prepare.c        |   1 +
 src/box/sql/sqliteInt.h      |   5 ++
 src/box/sql/vdbe.c           |  22 ++---
 test/sql/drop-table.result   | 151 +++++++++++++++++++++++++++++++++++
 test/sql/drop-table.test.lua |  85 ++++++++++++++++++++
 6 files changed, 351 insertions(+), 14 deletions(-)

diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index fb5d61c210..a802c45050 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -55,6 +55,53 @@
 #include "box/tuple_format.h"
 #include "box/coll_id_cache.h"
 
+/**
+ * Structure that contains information about record that was
+ * inserted into system space.
+ */
+struct saved_record
+{
+	/** A link in a record list. */
+	struct rlist link;
+	/** Id of space in which the record was inserted. */
+	uint32_t space_id;
+	/** First register of the key of the record. */
+	int reg_key;
+	/** Number of registers the key consists of. */
+	int reg_key_count;
+	/** The number of the OP_SInsert operation. */
+	int insertion_opcode;
+};
+
+/**
+ * Save inserted in system space record in list.
+ *
+ * @param parser SQL Parser object.
+ * @param space_id Id of table in which record is inserted.
+ * @param reg_key Register that contains first field of the key.
+ * @param reg_key_count Exact number of fields of the key.
+ * @param insertion_opcode Number of OP_SInsert opcode.
+ */
+static inline void
+save_record(struct Parse *parser, uint32_t space_id, int reg_key,
+	    int reg_key_count, int insertion_opcode)
+{
+	struct saved_record *record =
+		region_alloc(&parser->region, sizeof(*record));
+	if (record == NULL) {
+		diag_set(OutOfMemory, sizeof(*record), "region_alloc",
+			 "record");
+		parser->rc = SQL_TARANTOOL_ERROR;
+		parser->nErr++;
+		return;
+	}
+	record->space_id = space_id;
+	record->reg_key = reg_key;
+	record->reg_key_count = reg_key_count;
+	record->insertion_opcode = insertion_opcode;
+	rlist_add_entry(&parser->record_list, record, link);
+}
+
 void
 sql_finish_coding(struct Parse *parse_context)
 {
@@ -62,6 +109,42 @@ sql_finish_coding(struct Parse *parse_context)
 	struct sqlite3 *db = parse_context->db;
 	struct Vdbe *v = sqlite3GetVdbe(parse_context);
 	sqlite3VdbeAddOp0(v, OP_Halt);
+	/*
+	 * In case statement "CREATE TABLE ..." fails it can
+	 * left some records in system spaces that shouldn't be
+	 * there. To clean-up properly this code is added. Last
+	 * record isn't deleted because if statement fails than
+	 * it won't be created. This code works the same way for
+	 * other "CREATE ..." statements but it won't delete
+	 * anything as these statements create no more than one
+	 * record.
+	 */
+	if (!rlist_empty(&parse_context->record_list)) {
+		struct saved_record *record =
+			rlist_shift_entry(&parse_context->record_list,
+					  struct saved_record, link);
+		/* Set P2 of SInsert. */
+		sqlite3VdbeChangeP2(v, record->insertion_opcode, v->nOp);
+		MAYBE_UNUSED const char *comment =
+			"Delete entry from %s if CREATE TABLE fails";
+		rlist_foreach_entry(record, &parse_context->record_list, link) {
+			int record_reg = ++parse_context->nMem;
+			sqlite3VdbeAddOp3(v, OP_MakeRecord, record->reg_key,
+					  record->reg_key_count, record_reg);
+			sqlite3VdbeAddOp2(v, OP_SDelete, record->space_id,
+					  record_reg);
+			MAYBE_UNUSED struct space *space =
+				space_by_id(record->space_id);
+			VdbeComment((v, comment, space_name(space)));
+			/* Set P2 of SInsert. */
+			sqlite3VdbeChangeP2(v, record->insertion_opcode,
+					    v->nOp);
+		}
+		sqlite3VdbeAddOp1(v, OP_Halt, SQL_TARANTOOL_ERROR);
+		VdbeComment((v,
+			     "Exit with an error if CREATE statement fails"));
+	}
+
 	if (db->mallocFailed || parse_context->nErr != 0) {
 		if (parse_context->rc == SQLITE_OK)
 			parse_context->rc = SQLITE_ERROR;
@@ -1101,13 +1184,14 @@ vdbe_emit_create_index(struct Parse *parse, struct space_def *def,
 	sqlite3VdbeAddOp4(v, OP_Blob, index_parts_sz, entry_reg + 5,
 			  SQL_SUBTYPE_MSGPACK, index_parts, P4_STATIC);
 	sqlite3VdbeAddOp3(v, OP_MakeRecord, entry_reg, 6, tuple_reg);
-	sqlite3VdbeAddOp2(v, OP_SInsert, BOX_INDEX_ID, tuple_reg);
+	sqlite3VdbeAddOp3(v, OP_SInsert, BOX_INDEX_ID, 0, tuple_reg);
 	/*
 	 * Non-NULL value means that index has been created via
 	 * separate CREATE INDEX statement.
 	 */
 	if (idx_def->opts.sql != NULL)
 		sqlite3VdbeChangeP5(v, OPFLAG_NCHANGE);
+	save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1);
 	return;
 error:
 	parse->rc = SQL_TARANTOOL_ERROR;
@@ -1165,8 +1249,9 @@ createSpace(Parse * pParse, int iSpaceId, char *zStmt)
 	sqlite3VdbeAddOp4(v, OP_Blob, table_stmt_sz, iFirstCol + 6,
 			  SQL_SUBTYPE_MSGPACK, table_stmt, P4_STATIC);
 	sqlite3VdbeAddOp3(v, OP_MakeRecord, iFirstCol, 7, iRecord);
-	sqlite3VdbeAddOp2(v, OP_SInsert, BOX_SPACE_ID, iRecord);
+	sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SPACE_ID, 0, iRecord);
 	sqlite3VdbeChangeP5(v, OPFLAG_NCHANGE);
+	save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1);
 	return;
 error:
 	pParse->nErr++;
@@ -1340,9 +1425,11 @@ vdbe_emit_fkey_create(struct Parse *parse_context, const struct fkey_def *fk)
 			  parent_links, P4_DYNAMIC);
 	sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, constr_tuple_reg, 9,
 			  constr_tuple_reg + 9);
-	sqlite3VdbeAddOp2(vdbe, OP_SInsert, BOX_FK_CONSTRAINT_ID,
+	sqlite3VdbeAddOp3(vdbe, OP_SInsert, BOX_FK_CONSTRAINT_ID, 0,
 			  constr_tuple_reg + 9);
 	sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
+	save_record(parse_context, BOX_FK_CONSTRAINT_ID, constr_tuple_reg, 2,
+		    vdbe->nOp - 1);
 	sqlite3ReleaseTempRange(parse_context, constr_tuple_reg, 10);
 	return;
 error:
@@ -1487,14 +1574,18 @@ sqlite3EndTable(Parse * pParse,	/* Parse context */
 		int reg_seq_record =
 			emitNewSysSequenceRecord(pParse, reg_seq_id,
 						 p->def->name);
-		sqlite3VdbeAddOp2(v, OP_SInsert, BOX_SEQUENCE_ID,
+		sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SEQUENCE_ID, 0,
 				  reg_seq_record);
+		save_record(pParse, BOX_SEQUENCE_ID, reg_seq_record + 1, 1,
+			    v->nOp - 1);
 		/* Do an insertion into _space_sequence. */
 		int reg_space_seq_record =
 			emitNewSysSpaceSequenceRecord(pParse, reg_space_id,
 						      reg_seq_id);
-		sqlite3VdbeAddOp2(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID,
+		sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID, 0,
 				  reg_space_seq_record);
+		save_record(pParse, BOX_SPACE_SEQUENCE_ID,
+			    reg_space_seq_record + 1, 1, v->nOp - 1);
 	}
 	/* Code creation of FK constraints, if any. */
 	struct fkey_parse *fk_parse;
diff --git a/src/box/sql/prepare.c b/src/box/sql/prepare.c
index e98e84564e..a4b65ebe7c 100644
--- a/src/box/sql/prepare.c
+++ b/src/box/sql/prepare.c
@@ -274,6 +274,7 @@ sql_parser_create(struct Parse *parser, sqlite3 *db)
 	memset(parser, 0, sizeof(struct Parse));
 	parser->db = db;
 	rlist_create(&parser->new_fkey);
+	rlist_create(&parser->record_list);
 	region_create(&parser->region, &cord()->slabc);
 }
 
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 744b660726..d8eb516451 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2765,6 +2765,11 @@ struct Parse {
 	 * Foreign key constraint appeared in CREATE TABLE stmt.
 	 */
 	struct rlist new_fkey;
+	/**
+	 * List of all records that were inserted in system spaces
+	 * in current statement.
+	 */
+	struct rlist record_list;
 	bool initiateTTrans;	/* Initiate Tarantool transaction */
 	/** True, if table to be created has AUTOINCREMENT PK. */
 	bool is_new_table_autoinc;
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 10b58b4266..d6d6b7ed8f 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1010,8 +1010,12 @@ case OP_Halt: {
 	p->pc = pcx;
 	if (p->rc) {
 		if (p->rc == SQL_TARANTOOL_ERROR) {
-			assert(pOp->p4.z != NULL);
-			box_error_set(__FILE__, __LINE__, pOp->p5, pOp->p4.z);
+			if (pOp->p4.z == NULL) {
+				assert(! diag_is_empty(diag_get()));
+			} else {
+				box_error_set(__FILE__, __LINE__, pOp->p5,
+					      pOp->p4.z);
+			}
 		} else if (pOp->p5 != 0) {
 			static const char * const azType[] = { "NOT NULL", "UNIQUE", "CHECK",
 							       "FOREIGN KEY" };
@@ -4313,8 +4317,8 @@ case OP_IdxInsert: {
 	break;
 }
 
-/* Opcode: SInsert P1 P2 * * P5
- * Synopsis: space id = P1, key = r[P2]
+/* Opcode: SInsert P1 P2 P3 * P5
+ * Synopsis: space id = P1, key = r[P3], on error goto P2
  *
  * This opcode is used only during DDL routine.
  * In contrast to ordinary insertion, insertion to system spaces
@@ -4327,15 +4331,15 @@ case OP_IdxInsert: {
  */
 case OP_SInsert: {
 	assert(pOp->p1 > 0);
-	assert(pOp->p2 >= 0);
+	assert(pOp->p2 > 0);
+	assert(pOp->p3 >= 0);
 
-	pIn2 = &aMem[pOp->p2];
+	pIn3 = &aMem[pOp->p3];
 	struct space *space = space_by_id(pOp->p1);
 	assert(space != NULL);
 	assert(space_is_system(space));
-	rc = tarantoolSqlite3Insert(space, pIn2->z, pIn2->z + pIn2->n);
-	if (rc)
-		goto abort_due_to_error;
+	if (tarantoolSqlite3Insert(space, pIn3->z, pIn3->z + pIn3->n) != 0)
+		goto jump_to_p2;
 	if (pOp->p5 & OPFLAG_NCHANGE)
 		p->nChange++;
 	break;
diff --git a/test/sql/drop-table.result b/test/sql/drop-table.result
index 08f249668b..297799e48f 100644
--- a/test/sql/drop-table.result
+++ b/test/sql/drop-table.result
@@ -33,3 +33,154 @@ box.sql.execute("INSERT INTO zzzoobar VALUES (111, 222, 'c3', 444)")
 -- DROP TABLE should do the job
 -- Debug
 -- require("console").start()
+--
+-- gh-3592: clean-up garbage on failed CREATE TABLE statement.
+--
+-- Let user have enough rights to create space, but not enough to
+-- create index.
+--
+box.schema.user.create('tmp')
+---
+...
+box.schema.user.grant('tmp', 'create, read', 'universe')
+---
+...
+box.schema.user.grant('tmp', 'write', 'space', '_space')
+---
+...
+box.schema.user.grant('tmp', 'write', 'space', '_schema')
+---
+...
+-- Number of records in _space, _index, _sequence:
+space_count = #box.space._space:select()
+---
+...
+index_count = #box.space._index:select()
+---
+...
+sequence_count = #box.space._sequence:select()
+---
+...
+box.session.su('tmp')
+---
+...
+--
+-- Error: user do not have rights to write in box.space._index.
+-- Space that was already created should be automatically dropped.
+--
+box.sql.execute('CREATE TABLE t1 (id INT PRIMARY KEY, a INT)')
+---
+- error: Write access to space '_index' is denied for user 'tmp'
+...
+-- Error: no such table.
+box.sql.execute('DROP TABLE t1')
+---
+- error: 'no such table: T1'
+...
+box.session.su('admin')
+---
+...
+--
+-- Check that _space, _index and _sequence have the same number of
+-- records.
+--
+space_count == #box.space._space:select()
+---
+- true
+...
+index_count == #box.space._index:select()
+---
+- true
+...
+sequence_count == #box.space._sequence:select()
+---
+- true
+...
+--
+-- Give user right to write in _index. Still have not enough
+-- rights to write in _sequence.
+--
+box.schema.user.grant('tmp', 'write', 'space', '_index')
+---
+...
+box.session.su('tmp')
+---
+...
+--
+-- Error: user do not have rights to write in _sequence.
+--
+box.sql.execute('CREATE TABLE t2 (id INT PRIMARY KEY AUTOINCREMENT, a UNIQUE, b UNIQUE, c UNIQUE, d UNIQUE)')
+---
+- error: Write access to space '_sequence' is denied for user 'tmp'
+...
+box.session.su('admin')
+---
+...
+--
+-- Check that _space, _index and _sequence have the same number of
+-- records.
+--
+space_count == #box.space._space:select()
+---
+- true
+...
+index_count == #box.space._index:select()
+---
+- true
+...
+sequence_count == #box.space._sequence:select()
+---
+- true
+...
+fk_constraint_count = #box.space._fk_constraint:select()
+---
+...
+--
+-- Check that clean-up works fine after another error.
+--
+box.schema.user.grant('tmp', 'write', 'space')
+---
+...
+box.session.su('tmp')
+---
+...
+box.sql.execute('CREATE TABLE t3(a INTEGER PRIMARY KEY);')
+---
+...
+--
+-- Error: Failed to create foreign key constraint.
+--
+box.sql.execute('CREATE TABLE t4(x INTEGER PRIMARY KEY REFERENCES t3, a INT UNIQUE, c INT REFERENCES t3);')
+---
+- error: 'Failed to create foreign key constraint ''FK_CONSTRAINT_2_T4'': field type
+    mismatch'
+...
+box.sql.execute('DROP TABLE t3;')
+---
+...
+--
+-- Check that _space, _index and _sequence have the same number of
+-- records.
+--
+space_count == #box.space._space:select()
+---
+- true
+...
+index_count == #box.space._index:select()
+---
+- true
+...
+sequence_count == #box.space._sequence:select()
+---
+- true
+...
+fk_constraint_count == #box.space._fk_constraint:select()
+---
+- true
+...
+box.session.su('admin')
+---
+...
+box.schema.user.drop('tmp')
+---
+...
diff --git a/test/sql/drop-table.test.lua b/test/sql/drop-table.test.lua
index 9663074df9..b1c5253457 100644
--- a/test/sql/drop-table.test.lua
+++ b/test/sql/drop-table.test.lua
@@ -25,3 +25,88 @@ box.sql.execute("INSERT INTO zzzoobar VALUES (111, 222, 'c3', 444)")
 
 -- Debug
 -- require("console").start()
+
+--
+-- gh-3592: clean-up garbage on failed CREATE TABLE statement.
+--
+-- Let user have enough rights to create space, but not enough to
+-- create index.
+--
+box.schema.user.create('tmp')
+box.schema.user.grant('tmp', 'create, read', 'universe')
+box.schema.user.grant('tmp', 'write', 'space', '_space')
+box.schema.user.grant('tmp', 'write', 'space', '_schema')
+
+-- Number of records in _space, _index, _sequence:
+space_count = #box.space._space:select()
+index_count = #box.space._index:select()
+sequence_count = #box.space._sequence:select()
+
+box.session.su('tmp')
+--
+-- Error: user do not have rights to write in box.space._index.
+-- Space that was already created should be automatically dropped.
+--
+box.sql.execute('CREATE TABLE t1 (id INT PRIMARY KEY, a INT)')
+-- Error: no such table.
+box.sql.execute('DROP TABLE t1')
+
+box.session.su('admin')
+
+--
+-- Check that _space, _index and _sequence have the same number of
+-- records.
+--
+space_count == #box.space._space:select()
+index_count == #box.space._index:select()
+sequence_count == #box.space._sequence:select()
+
+--
+-- Give user right to write in _index. Still have not enough
+-- rights to write in _sequence.
+--
+box.schema.user.grant('tmp', 'write', 'space', '_index')
+box.session.su('tmp')
+
+--
+-- Error: user do not have rights to write in _sequence.
+--
+box.sql.execute('CREATE TABLE t2 (id INT PRIMARY KEY AUTOINCREMENT, a UNIQUE, b UNIQUE, c UNIQUE, d UNIQUE)')
+
+box.session.su('admin')
+
+--
+-- Check that _space, _index and _sequence have the same number of
+-- records.
+--
+space_count == #box.space._space:select()
+index_count == #box.space._index:select()
+sequence_count == #box.space._sequence:select()
+
+fk_constraint_count = #box.space._fk_constraint:select()
+
+--
+-- Check that clean-up works fine after another error.
+--
+box.schema.user.grant('tmp', 'write', 'space')
+box.session.su('tmp')
+
+box.sql.execute('CREATE TABLE t3(a INTEGER PRIMARY KEY);')
+--
+-- Error: Failed to create foreign key constraint.
+--
+box.sql.execute('CREATE TABLE t4(x INTEGER PRIMARY KEY REFERENCES t3, a INT UNIQUE, c INT REFERENCES t3);')
+box.sql.execute('DROP TABLE t3;')
+
+--
+-- Check that _space, _index and _sequence have the same number of
+-- records.
+--
+space_count == #box.space._space:select()
+index_count == #box.space._index:select()
+sequence_count == #box.space._sequence:select()
+fk_constraint_count == #box.space._fk_constraint:select()
+
+box.session.su('admin')
+
+box.schema.user.drop('tmp')
-- 
GitLab