From 3846d9b2d4d43e9b4c777b1959a2240b3437bb15 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Wed, 10 Oct 2018 18:12:06 +0300
Subject: [PATCH] vinyl: bypass format validation for statements loaded from
 disk

When the format of a space is altered, we walk over all tuples stored in
the primary index and check them against the new format. This doesn't
guarantee that all *statements* stored in the primary index conform to
the new format though, because the check isn't performed for deleted or
overwritten statements, e.g.

  s = box.schema.space.create('test', {engine = 'vinyl'})
  s:create_index('primary')
  s:insert{1}
  box.snapshot()
  s:delete{1}

  -- The following command will succeed, because the space is empty,
  -- however one of the runs contains REPLACE{1}, which doesn't conform
  -- to the new format.
  s:create_index('secondary', {parts = {2, 'unsigned'}})

This is OK as we will never return such overwritten statements to the
user, however we may still need to read them. Currently, this leads
either to an assertion failure or to a read error in

  vy_stmt_decode
   vy_stmt_new_with_ops
    tuple_init_field_map

We could probably force major compaction of the primary index to purge
such statements, but it is complicated as there may be a read view
preventing the write iterator from squashing such a statement, and
currently there's no way to force destruction of a read view.

So this patch simply disables format validation for all tuples loaded
from disk (actually we already skip format validation for all secondary
index statements and for DELETE statements in primary indexes so this
isn't as bad as it may seem). To do that, it adds a boolean parameter to
tuple_init_field_map() that disables format validation, and then makes
vy_stmt_new_with_ops(), which is used for constructing vinyl statements,
set it to false. This is OK as all statements inserted into a vinyl
space are validated explicitly with tuple_validate() anyway.

This is rather a workaround for the lack of a better solution.

Closes #3540
---
 src/box/memtx_engine.c  |  2 +-
 src/box/tuple.c         |  2 +-
 src/box/tuple_format.c  | 16 ++++++++++------
 src/box/tuple_format.h  |  3 ++-
 src/box/vy_stmt.c       | 23 ++++++++++++++++++-----
 test/vinyl/ddl.result   | 39 +++++++++++++++++++++++++++++++++++++++
 test/vinyl/ddl.test.lua | 15 +++++++++++++++
 7 files changed, 86 insertions(+), 14 deletions(-)

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index ae1f5a0ea4..5a5e87e64f 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1170,7 +1170,7 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 	char *raw = (char *) tuple + tuple->data_offset;
 	uint32_t *field_map = (uint32_t *) raw;
 	memcpy(raw, data, tuple_len);
-	if (tuple_init_field_map(format, field_map, raw)) {
+	if (tuple_init_field_map(format, field_map, raw, true)) {
 		memtx_tuple_delete(format, tuple);
 		return NULL;
 	}
diff --git a/src/box/tuple.c b/src/box/tuple.c
index d7dbad3015..c975e539ef 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -115,7 +115,7 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
 	char *raw = (char *) tuple + tuple->data_offset;
 	uint32_t *field_map = (uint32_t *) raw;
 	memcpy(raw, data, data_len);
-	if (tuple_init_field_map(format, field_map, raw)) {
+	if (tuple_init_field_map(format, field_map, raw, true)) {
 		runtime_tuple_delete(format, tuple);
 		return NULL;
 	}
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index b385c0d99d..5f4899d9df 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -349,7 +349,7 @@ tuple_format_dup(struct tuple_format *src)
 /** @sa declaration for details. */
 int
 tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
-		     const char *tuple)
+		     const char *tuple, bool validate)
 {
 	if (format->field_count == 0)
 		return 0; /* Nothing to initialize */
@@ -358,14 +358,14 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 
 	/* Check to see if the tuple has a sufficient number of fields. */
 	uint32_t field_count = mp_decode_array(&pos);
-	if (format->exact_field_count > 0 &&
+	if (validate && format->exact_field_count > 0 &&
 	    format->exact_field_count != field_count) {
 		diag_set(ClientError, ER_EXACT_FIELD_COUNT,
 			 (unsigned) field_count,
 			 (unsigned) format->exact_field_count);
 		return -1;
 	}
-	if (unlikely(field_count < format->min_field_count)) {
+	if (validate && field_count < format->min_field_count) {
 		diag_set(ClientError, ER_MIN_FIELD_COUNT,
 			 (unsigned) field_count,
 			 (unsigned) format->min_field_count);
@@ -375,14 +375,17 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	/* first field is simply accessible, so we do not store offset to it */
 	enum mp_type mp_type = mp_typeof(*pos);
 	const struct tuple_field *field = &format->fields[0];
-	if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
+	if (validate &&
+	    key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
 				 TUPLE_INDEX_BASE, field->is_nullable))
 		return -1;
 	mp_next(&pos);
 	/* other fields...*/
 	++field;
 	uint32_t i = 1;
-	uint32_t defined_field_count = MIN(field_count, format->field_count);
+	uint32_t defined_field_count = MIN(field_count, validate ?
+					   format->field_count :
+					   format->index_field_count);
 	if (field_count < format->index_field_count) {
 		/*
 		 * Nullify field map to be able to detect by 0,
@@ -393,7 +396,8 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	}
 	for (; i < defined_field_count; ++i, ++field) {
 		mp_type = mp_typeof(*pos);
-		if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
+		if (validate &&
+		    key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
 					 i + TUPLE_INDEX_BASE,
 					 field->is_nullable))
 			return -1;
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index a9a9c3015e..344aada7ef 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -309,6 +309,7 @@ box_tuple_format_unref(box_tuple_format_t *format);
  * @param field_map A pointer behind the last element of the field
  *                  map.
  * @param tuple     MessagePack array.
+ * @param validate  If set, validate the tuple against the format.
  *
  * @retval  0 Success.
  * @retval -1 Format error.
@@ -321,7 +322,7 @@ box_tuple_format_unref(box_tuple_format_t *format);
  */
 int
 tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
-		     const char *tuple);
+		     const char *tuple, bool validate);
 
 /**
  * Get a field at the specific position in this MessagePack array.
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 08b8fb68a2..ebe64300b8 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -75,6 +75,9 @@ vy_stmt_persistent_flags(const struct tuple *stmt, bool is_primary)
 static struct tuple *
 vy_tuple_new(struct tuple_format *format, const char *data, const char *end)
 {
+	if (tuple_validate_raw(format, data) != 0)
+		return NULL;
+
 	return vy_stmt_new_insert(format, data, end);
 }
 
@@ -264,9 +267,7 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	mp_tuple_assert(tuple_begin, tuple_end);
 
 	const char *tmp = tuple_begin;
-	uint32_t field_count = mp_decode_array(&tmp);
-	assert(field_count >= format->min_field_count);
-	(void) field_count;
+	mp_decode_array(&tmp);
 
 	size_t ops_size = 0;
 	for (int i = 0; i < op_count; ++i)
@@ -293,8 +294,20 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	}
 	vy_stmt_set_type(stmt, type);
 
-	/* Calculate offsets for key parts */
-	if (tuple_init_field_map(format, (uint32_t *) raw, raw)) {
+	/*
+	 * Calculate offsets for key parts.
+	 *
+	 * Note, an overwritten statement loaded from a primary
+	 * index run file may not conform to the current format
+	 * in case the space was altered (e.g. a new field was
+	 * added which is missing in a deleted tuple). Although
+	 * we should never return such statements to the user,
+	 * we may still need to decode them while iterating over
+	 * a run so we skip tuple validation here. This is OK as
+	 * tuples inserted into a space are validated explicitly
+	 * with tuple_validate() anyway.
+	 */
+	if (tuple_init_field_map(format, (uint32_t *) raw, raw, false)) {
 		tuple_unref(stmt);
 		return NULL;
 	}
diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result
index 06f9d4e996..dbffc3e5b0 100644
--- a/test/vinyl/ddl.result
+++ b/test/vinyl/ddl.result
@@ -621,6 +621,45 @@ s:drop()
 ---
 ...
 --
+-- gh-3540 assertion after ALTER when reading an overwritten
+-- statement that doesn't match the new space format.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('primary', {run_count_per_level = 10})
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2, 'a'}
+---
+- [2, 'a']
+...
+box.snapshot()
+---
+- ok
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+s:delete{2}
+---
+...
+_ = s:create_index('secondary', {parts = {2, 'unsigned'}})
+---
+...
+s:select()
+---
+- - [1, 1]
+...
+s:drop()
+---
+...
+--
 -- Check that all modifications done to the space during index build
 -- are reflected in the new index.
 --
diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua
index c8c7861917..7c50404662 100644
--- a/test/vinyl/ddl.test.lua
+++ b/test/vinyl/ddl.test.lua
@@ -227,6 +227,21 @@ pk:alter{parts = {1, 'unsigned'}}
 s:replace{-1}
 s:drop()
 
+--
+-- gh-3540 assertion after ALTER when reading an overwritten
+-- statement that doesn't match the new space format.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('primary', {run_count_per_level = 10})
+s:replace{1}
+s:replace{2, 'a'}
+box.snapshot()
+s:replace{1, 1}
+s:delete{2}
+_ = s:create_index('secondary', {parts = {2, 'unsigned'}})
+s:select()
+s:drop()
+
 --
 -- Check that all modifications done to the space during index build
 -- are reflected in the new index.
-- 
GitLab