From f909b656830a86406069de6c04ce7b0d9d4b6119 Mon Sep 17 00:00:00 2001
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date: Fri, 8 Dec 2017 00:16:25 +0300
Subject: [PATCH] tuple: allow to do not specify tail nullable index columns

If a column is nullable and is the last defined one (via index parts
or space format), it can be omited on insertion. Such absent fields
are treated as NULLs in comparators and are not stored.

Closes #2988
---
 src/box/alter.cc             |  81 +++-
 src/box/index_def.h          |  13 +
 src/box/key_def.cc           |  20 +
 src/box/key_def.h            |  15 +
 src/box/tuple_compare.cc     | 170 +++++++--
 src/box/tuple_extract_key.cc | 206 ++++++++---
 src/box/tuple_format.c       |  37 +-
 src/box/tuple_format.h       |  25 +-
 src/box/tuple_hash.cc        |  36 +-
 src/box/vy_stmt.c            |  21 +-
 src/box/vy_stmt.h            |   2 +-
 test/engine/null.result      | 689 +++++++++++++++++++++++++++++++++++
 test/engine/null.test.lua    | 184 ++++++++++
 13 files changed, 1393 insertions(+), 106 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index d7191c1bba..5749740d21 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -661,6 +661,12 @@ struct alter_space {
 	 * substantially.
 	 */
 	struct key_def *pk_def;
+	/**
+	 * Min field count of a new space. It is calculated before
+	 * the new space is created and used to update optionality
+	 * of key_defs and key_parts.
+	 */
+	uint32_t new_min_field_count;
 };
 
 static struct alter_space *
@@ -671,6 +677,10 @@ alter_space_new(struct space *old_space)
 	rlist_create(&alter->ops);
 	alter->old_space = old_space;
 	alter->space_def = space_def_dup_xc(alter->old_space->def);
+	if (old_space->format != NULL)
+		alter->new_min_field_count = old_space->format->min_field_count;
+	else
+		alter->new_min_field_count = 0;
 	return alter;
 }
 
@@ -1359,15 +1369,32 @@ alter_space_move_indexes(struct alter_space *alter, uint32_t begin,
 			 uint32_t end)
 {
 	struct space *old_space = alter->old_space;
+	bool is_min_field_count_changed;
+	if (old_space->format != NULL) {
+		is_min_field_count_changed =
+			old_space->format->min_field_count !=
+			alter->new_min_field_count;
+	} else {
+		is_min_field_count_changed = false;
+	}
 	for (uint32_t index_id = begin; index_id < end; ++index_id) {
 		struct index *old_index = space_index(old_space, index_id);
 		if (old_index == NULL)
 			continue;
 		struct index_def *old_def = old_index->def;
+		struct index_def *new_def;
+		uint32_t min_field_count = alter->new_min_field_count;
 		if ((old_def->opts.is_unique &&
 		     !old_def->key_def->is_nullable) ||
 		    old_def->type != TREE || alter->pk_def == NULL) {
-			(void) new MoveIndex(alter, old_def->iid);
+			if (is_min_field_count_changed) {
+				new_def = index_def_dup(old_def);
+				index_def_update_optionality(new_def,
+							     min_field_count);
+				(void) new ModifyIndex(alter, new_def, old_def);
+			} else {
+				(void) new MoveIndex(alter, old_def->iid);
+			}
 			continue;
 		}
 		/*
@@ -1375,11 +1402,11 @@ alter_space_move_indexes(struct alter_space *alter, uint32_t begin,
 		 * the primary, since primary key parts have
 		 * changed.
 		 */
-		struct index_def *new_def =
-			index_def_new(old_def->space_id, old_def->iid,
-				      old_def->name, strlen(old_def->name),
-				      old_def->type, &old_def->opts,
-				      old_def->key_def, alter->pk_def);
+		new_def = index_def_new(old_def->space_id, old_def->iid,
+					old_def->name, strlen(old_def->name),
+					old_def->type, &old_def->opts,
+					old_def->key_def, alter->pk_def);
+		index_def_update_optionality(new_def, min_field_count);
 		auto guard = make_scoped_guard([=] { index_def_delete(new_def); });
 		(void) new RebuildIndex(alter, new_def, old_def);
 		guard.is_active = false;
@@ -1670,16 +1697,19 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
 	 * First, move all unchanged indexes from the old space
 	 * to the new one.
 	 */
-	alter_space_move_indexes(alter, 0, iid);
 	/* Case 1: drop the index, if it is dropped. */
 	if (old_index != NULL && new_tuple == NULL) {
+		alter_space_move_indexes(alter, 0, iid);
 		(void) new DropIndex(alter, old_index->def);
 	}
 	/* Case 2: create an index, if it is simply created. */
 	if (old_index == NULL && new_tuple != NULL) {
+		alter_space_move_indexes(alter, 0, iid);
 		CreateIndex *create_index = new CreateIndex(alter);
 		create_index->new_index_def =
 			index_def_new_from_tuple(new_tuple, old_space);
+		index_def_update_optionality(create_index->new_index_def,
+					     alter->new_min_field_count);
 	}
 	/* Case 3 and 4: check if we need to rebuild index data. */
 	if (old_index != NULL && new_tuple != NULL) {
@@ -1687,6 +1717,43 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
 		index_def = index_def_new_from_tuple(new_tuple, old_space);
 		auto index_def_guard =
 			make_scoped_guard([=] { index_def_delete(index_def); });
+		/*
+		 * To detect which key parts are optional,
+		 * min_field_count is required. But
+		 * min_field_count from the old space format can
+		 * not be used. For example, consider the case,
+		 * when a space has no format, has a primary index
+		 * on the first field and has a single secondary
+		 * index on a non-nullable second field. Min field
+		 * count here is 2. Now alter the secondary index
+		 * to make its part be nullable. In the
+		 * 'old_space' min_field_count is still 2, but
+		 * actually it is already 1. Actual
+		 * min_field_count must be calculated using old
+		 * unchanged indexes, NEW definition of an updated
+		 * index and a space format, defined by a user.
+		 */
+		struct key_def **keys;
+		size_t bsize = old_space->index_count * sizeof(keys[0]);
+		keys = (struct key_def **) region_alloc_xc(&fiber()->gc,
+							   bsize);
+		for (uint32_t i = 0, j = 0; i < old_space->index_count;
+		     ++i) {
+			struct index_def *d = old_space->index[i]->def;
+			if (d->iid != index_def->iid)
+				keys[j++] = d->key_def;
+			else
+				keys[j++] = index_def->key_def;
+		}
+		struct space_def *def = old_space->def;
+		alter->new_min_field_count =
+			tuple_format_min_field_count(keys,
+						     old_space->index_count,
+						     def->fields,
+						     def->field_count);
+		index_def_update_optionality(index_def,
+					     alter->new_min_field_count);
+		alter_space_move_indexes(alter, 0, iid);
 		if (index_def_cmp(index_def, old_index->def) == 0) {
 			/* Index is not changed so just move it. */
 			(void) new MoveIndex(alter, old_index->def->iid);
diff --git a/src/box/index_def.h b/src/box/index_def.h
index 18f941364d..251506a85e 100644
--- a/src/box/index_def.h
+++ b/src/box/index_def.h
@@ -163,6 +163,19 @@ index_def_dup(const struct index_def *def);
 void
 index_def_delete(struct index_def *def);
 
+/**
+ * Update 'has_optional_parts' property of key definitions.
+ * @param def Index def, containing key definitions to update.
+ * @param min_field_count Minimal field count. All parts out of
+ *        this value are optional.
+ */
+static inline void
+index_def_update_optionality(struct index_def *def, uint32_t min_field_count)
+{
+	key_def_update_optionality(def->key_def, min_field_count);
+	key_def_update_optionality(def->cmp_def, min_field_count);
+}
+
 /**
  * Add an index definition to a list, preserving the
  * first position of the primary key.
diff --git a/src/box/key_def.cc b/src/box/key_def.cc
index 06b12dfd22..955349cf33 100644
--- a/src/box/key_def.cc
+++ b/src/box/key_def.cc
@@ -276,6 +276,24 @@ key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno,
 		key_def_set_cmp(def);
 }
 
+void
+key_def_update_optionality(struct key_def *def, uint32_t min_field_count)
+{
+	def->has_optional_parts = false;
+	for (uint32_t i = 0; i < def->part_count; ++i) {
+		struct key_part *part = &def->parts[i];
+		def->has_optional_parts |= part->is_nullable &&
+					   min_field_count < part->fieldno + 1;
+		/*
+		 * One optional part is enough to switch to new
+		 * comparators.
+		 */
+		if (def->has_optional_parts)
+			break;
+	}
+	key_def_set_cmp(def);
+}
+
 int
 key_def_snprint_parts(char *buf, int size, const struct key_part_def *parts,
 		      uint32_t part_count)
@@ -531,6 +549,8 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 	new_def->part_count = new_part_count;
 	new_def->unique_part_count = new_part_count;
 	new_def->is_nullable = first->is_nullable || second->is_nullable;
+	new_def->has_optional_parts = first->has_optional_parts ||
+				      second->has_optional_parts;
 	/* Write position in the new key def. */
 	uint32_t pos = 0;
 	/* Append first key def's parts to the new index_def. */
diff --git a/src/box/key_def.h b/src/box/key_def.h
index 50cc547c1e..4726c38a22 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -126,6 +126,11 @@ struct key_def {
 	uint32_t unique_part_count;
 	/** True, if at least one part can store NULL. */
 	bool is_nullable;
+	/**
+	 * True, if some key parts can be absent in a tuple. These
+	 * fields assumed to be MP_NIL.
+	 */
+	bool has_optional_parts;
 	/** Key fields mask. @sa column_mask.h for details. */
 	uint64_t column_mask;
 	/** The size of the 'parts' array. */
@@ -232,6 +237,16 @@ void
 key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno,
 		 enum field_type type, bool is_nullable, struct coll *coll);
 
+/**
+ * Update 'has_optional_parts' of @a key_def with correspondence
+ * to @a min_field_count.
+ * @param def Key definition to update.
+ * @param min_field_count Minimal field count. All parts out of
+ *        this value are optional.
+ */
+void
+key_def_update_optionality(struct key_def *def, uint32_t min_field_count);
+
 /**
  * An snprint-style function to print a key definition.
  */
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index 3c6adfbc6c..edbbfb12dc 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -426,15 +426,23 @@ tuple_compare_field_with_hint(const char *field_a, enum mp_type a_type,
 	}
 }
 
-template<bool is_nullable>
+template<bool is_nullable, bool has_optional_parts>
 static inline int
 tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 		       const struct key_def *key_def)
 {
+	assert(!has_optional_parts || is_nullable);
+	assert(is_nullable == key_def->is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
 	const struct key_part *part = key_def->parts;
 	const char *tuple_a_raw = tuple_data(tuple_a);
 	const char *tuple_b_raw = tuple_data(tuple_b);
 	if (key_def->part_count == 1 && part->fieldno == 0) {
+		/*
+		 * First field can not be optional - empty tuples
+		 * can not exist.
+		 */
+		assert(!has_optional_parts);
 		mp_decode_array(&tuple_a_raw);
 		mp_decode_array(&tuple_b_raw);
 		if (! is_nullable) {
@@ -458,8 +466,8 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	const uint32_t *field_map_a = tuple_field_map(tuple_a);
 	const uint32_t *field_map_b = tuple_field_map(tuple_b);
 	const struct key_part *end;
-	const char *field_a;
-	const char *field_b;
+	const char *field_a, *field_b;
+	enum mp_type a_type, b_type;
 	int rc;
 	if (is_nullable)
 		end = part + key_def->unique_part_count;
@@ -471,7 +479,7 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 					  part->fieldno);
 		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
 					  part->fieldno);
-		assert(field_a != NULL && field_b != NULL);
+		assert(has_optional_parts || field_a != NULL && field_b != NULL);
 		if (! is_nullable) {
 			rc = tuple_compare_field(field_a, field_b, part->type,
 						 part->coll);
@@ -480,8 +488,13 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 			else
 				continue;
 		}
-		enum mp_type a_type = mp_typeof(*field_a);
-		enum mp_type b_type = mp_typeof(*field_b);
+		if (has_optional_parts) {
+			a_type = field_a != NULL ? mp_typeof(*field_a) : MP_NIL;
+			b_type = field_b != NULL ? mp_typeof(*field_b) : MP_NIL;
+		} else {
+			a_type = mp_typeof(*field_a);
+			b_type = mp_typeof(*field_b);
+		}
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
@@ -515,6 +528,10 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 					  part->fieldno);
 		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
 					  part->fieldno);
+		/*
+		 * Extended parts are primary, and they can not
+		 * be absent or be NULLs.
+		 */
 		assert(field_a != NULL && field_b != NULL);
 		rc = tuple_compare_field(field_a, field_b, part->type,
 					 part->coll);
@@ -524,18 +541,22 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	return 0;
 }
 
-template<bool is_nullable>
+template<bool is_nullable, bool has_optional_parts>
 static inline int
 tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 				uint32_t part_count,
 				const struct key_def *key_def)
 {
+	assert(!has_optional_parts || is_nullable);
+	assert(is_nullable == key_def->is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
 	assert(key != NULL || part_count == 0);
 	assert(part_count <= key_def->part_count);
 	const struct key_part *part = key_def->parts;
 	const struct tuple_format *format = tuple_format(tuple);
 	const char *tuple_raw = tuple_data(tuple);
 	const uint32_t *field_map = tuple_field_map(tuple);
+	enum mp_type a_type, b_type;
 	if (likely(part_count == 1)) {
 		const char *field;
 		field = tuple_field_raw(format, tuple_raw, field_map,
@@ -544,8 +565,11 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 			return tuple_compare_field(field, key, part->type,
 						   part->coll);
 		}
-		enum mp_type a_type = mp_typeof(*field);
-		enum mp_type b_type = mp_typeof(*key);
+		if (has_optional_parts)
+			a_type = field != NULL ? mp_typeof(*field) : MP_NIL;
+		else
+			a_type = mp_typeof(*field);
+		b_type = mp_typeof(*key);
 		if (a_type == MP_NIL) {
 			return b_type == MP_NIL ? 0 : -1;
 		} else if (b_type == MP_NIL) {
@@ -564,15 +588,18 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 		field = tuple_field_raw(format, tuple_raw, field_map,
 					part->fieldno);
 		if (! is_nullable) {
-			int rc = tuple_compare_field(field, key, part->type,
-						     part->coll);
+			rc = tuple_compare_field(field, key, part->type,
+						 part->coll);
 			if (rc != 0)
 				return rc;
 			else
 				continue;
 		}
-		enum mp_type a_type = mp_typeof(*field);
-		enum mp_type b_type = mp_typeof(*key);
+		if (has_optional_parts)
+			a_type = field != NULL ? mp_typeof(*field) : MP_NIL;
+		else
+			a_type = mp_typeof(*field);
+		b_type = mp_typeof(*key);
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
@@ -645,19 +672,46 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 	return 0;
 }
 
-template<bool is_nullable>
+template<bool is_nullable, bool has_optional_parts>
 static inline int
-tuple_compare_with_key_sequential(const struct tuple *tuple,
-	const char *key, uint32_t part_count, const struct key_def *key_def)
+tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key,
+				  uint32_t part_count,
+				  const struct key_def *key_def)
 {
+	assert(!has_optional_parts || is_nullable);
 	assert(key_def_is_sequential(key_def));
+	assert(is_nullable == key_def->is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
 	const char *tuple_key = tuple_data(tuple);
-	uint32_t tuple_field_count = mp_decode_array(&tuple_key);
-	assert(tuple_field_count >= key_def->part_count);
-	assert(part_count <= key_def->part_count);
-	(void) tuple_field_count;
-	return key_compare_parts<is_nullable>(tuple_key, key, part_count,
-					      key_def);
+	uint32_t field_count = mp_decode_array(&tuple_key);
+	uint32_t cmp_part_count;
+	if (has_optional_parts && field_count < part_count) {
+		cmp_part_count = field_count;
+	} else {
+		assert(field_count >= part_count);
+		cmp_part_count = part_count;
+	}
+	int rc = key_compare_parts<is_nullable>(tuple_key, key, cmp_part_count,
+						key_def);
+	if (!has_optional_parts || rc != 0)
+		return rc;
+	/*
+	 * If some tuple indexed fields are absent, then check
+	 * corresponding key fields to be equal to NULL.
+	 */
+	if (field_count < part_count) {
+		/*
+		 * Key's and tuple's first field_count fields are
+		 * equal, and their bsize too.
+		 */
+		key += tuple->bsize - mp_sizeof_array(field_count);
+		for (uint32_t i = field_count; i < part_count;
+		     ++i, mp_next(&key)) {
+			if (mp_typeof(*key) != MP_NIL)
+				return -1;
+		}
+	}
+	return 0;
 }
 
 int
@@ -679,19 +733,21 @@ key_compare(const char *key_a, const char *key_b,
 	}
 }
 
-template<bool is_nullable>
+template <bool is_nullable, bool has_optional_parts>
 static int
 tuple_compare_sequential(const struct tuple *tuple_a,
 			 const struct tuple *tuple_b,
 			 const struct key_def *key_def)
 {
+	assert(!has_optional_parts || is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
 	assert(key_def_is_sequential(key_def));
 	assert(is_nullable == key_def->is_nullable);
 	const char *key_a = tuple_data(tuple_a);
 	uint32_t fc_a = mp_decode_array(&key_a);
 	const char *key_b = tuple_data(tuple_b);
 	uint32_t fc_b = mp_decode_array(&key_b);
-	if (! is_nullable) {
+	if (!has_optional_parts && !is_nullable) {
 		assert(fc_a >= key_def->part_count);
 		assert(fc_b >= key_def->part_count);
 		return key_compare_parts<false>(key_a, key_b,
@@ -702,9 +758,15 @@ tuple_compare_sequential(const struct tuple *tuple_a,
 	const struct key_part *end = part + key_def->unique_part_count;
 	int rc;
 	uint32_t i = 0;
-	for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) {
-		enum mp_type a_type = mp_typeof(*key_a);
-		enum mp_type b_type = mp_typeof(*key_b);
+	for (; part < end; ++part, ++i) {
+		enum mp_type a_type, b_type;
+		if (has_optional_parts) {
+			a_type = i >= fc_a ? MP_NIL : mp_typeof(*key_a);
+			b_type = i >= fc_b ? MP_NIL : mp_typeof(*key_b);
+		} else {
+			a_type = mp_typeof(*key_a);
+			b_type = mp_typeof(*key_b);
+		}
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
@@ -718,11 +780,21 @@ tuple_compare_sequential(const struct tuple *tuple_a,
 			if (rc != 0)
 				return rc;
 		}
+		if (!has_optional_parts || i < fc_a)
+			mp_next(&key_a);
+		if (!has_optional_parts || i < fc_b)
+			mp_next(&key_b);
 	}
 	if (! was_null_met)
 		return 0;
 	end = key_def->parts + key_def->part_count;
 	for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) {
+		/*
+		 * If tuples are equal by unique_part_count, then
+		 * the rest of parts are a primary key, which can
+		 * not be absent or be null.
+		 */
+		assert(i < fc_a && i < fc_b);
 		rc = tuple_compare_field(key_a, key_b, part->type,
 					 part->coll);
 		if (rc != 0)
@@ -911,13 +983,21 @@ static const comparator_signature cmp_arr[] = {
 #undef COMPARATOR
 
 tuple_compare_t
-tuple_compare_create(const struct key_def *def) {
+tuple_compare_create(const struct key_def *def)
+{
 	if (def->is_nullable) {
-		if (key_def_is_sequential(def))
-			return tuple_compare_sequential<true>;
-		else
-			return tuple_compare_slowpath<true>;
+		if (key_def_is_sequential(def)) {
+			if (def->has_optional_parts)
+				return tuple_compare_sequential<true, true>;
+			else
+				return tuple_compare_sequential<true, false>;
+		} else if (def->has_optional_parts) {
+			return tuple_compare_slowpath<true, true>;
+		} else {
+			return tuple_compare_slowpath<true, false>;
+		}
 	}
+	assert(! def->has_optional_parts);
 	if (!key_def_has_collation(def)) {
 		/* Precalculated comparators don't use collation */
 		for (uint32_t k = 0;
@@ -935,9 +1015,9 @@ tuple_compare_create(const struct key_def *def) {
 		}
 	}
 	if (key_def_is_sequential(def))
-		return tuple_compare_sequential<false>;
+		return tuple_compare_sequential<false, false>;
 	else
-		return tuple_compare_slowpath<false>;
+		return tuple_compare_slowpath<false, false>;
 }
 
 /* }}} tuple_compare */
@@ -1123,10 +1203,21 @@ tuple_compare_with_key_t
 tuple_compare_with_key_create(const struct key_def *def)
 {
 	if (def->is_nullable) {
-		if (key_def_is_sequential(def))
-			return tuple_compare_with_key_sequential<true>;
-		return tuple_compare_with_key_slowpath<true>;
+		if (key_def_is_sequential(def)) {
+			if (def->has_optional_parts) {
+				return tuple_compare_with_key_sequential<true,
+									 true>;
+			} else {
+				return tuple_compare_with_key_sequential<true,
+									 false>;
+			}
+		} else if (def->has_optional_parts) {
+			return tuple_compare_with_key_slowpath<true, true>;
+		} else {
+			return tuple_compare_with_key_slowpath<true, false>;
+		}
 	}
+	assert(! def->has_optional_parts);
 	if (!key_def_has_collation(def)) {
 		/* Precalculated comparators don't use collation */
 		for (uint32_t k = 0;
@@ -1147,8 +1238,9 @@ tuple_compare_with_key_create(const struct key_def *def)
 		}
 	}
 	if (key_def_is_sequential(def))
-		return tuple_compare_with_key_sequential<false>;
-	return tuple_compare_with_key_slowpath<false>;
+		return tuple_compare_with_key_sequential<false, false>;
+	else
+		return tuple_compare_with_key_slowpath<false, false>;
 }
 
 /* }}} tuple_compare_with_key */
diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
index ea58a536e1..880abb6bff 100644
--- a/src/box/tuple_extract_key.cc
+++ b/src/box/tuple_extract_key.cc
@@ -2,29 +2,52 @@
 #include "tuple.h"
 #include "fiber.h"
 
+enum { MSGPACK_NULL = 0xc0 };
+
+/** True, if a key con contain two or more parts in sequence. */
+static bool
+key_def_contains_sequential_parts(const struct key_def *def)
+{
+	for (uint32_t i = 0; i < def->part_count - 1; ++i) {
+		if (def->parts[i].fieldno + 1 == def->parts[i + 1].fieldno)
+			return true;
+	}
+	return false;
+}
+
 /**
  * Optimized version of tuple_extract_key_raw() for sequential key defs
  * @copydoc tuple_extract_key_raw()
  */
+template <bool has_optional_parts>
 static char *
 tuple_extract_key_sequential_raw(const char *data, const char *data_end,
 				 const struct key_def *key_def,
 				 uint32_t *key_size)
 {
+	assert(!has_optional_parts || key_def->is_nullable);
 	assert(key_def_is_sequential(key_def));
+	assert(has_optional_parts == key_def->has_optional_parts);
+	assert(data_end != NULL);
+	assert(mp_sizeof_nil() == 1);
 	const char *field_start = data;
 	uint32_t bsize = mp_sizeof_array(key_def->part_count);
-
-	mp_decode_array(&field_start);
+	uint32_t field_count = mp_decode_array(&field_start);
 	const char *field_end = field_start;
-
-	for (uint32_t i = 0; i < key_def->part_count; i++)
-		mp_next(&field_end);
+	uint32_t null_count;
+	if (!has_optional_parts || field_count > key_def->part_count) {
+		for (uint32_t i = 0; i < key_def->part_count; i++)
+			mp_next(&field_end);
+		null_count = 0;
+	} else {
+		assert(key_def->is_nullable);
+		null_count = key_def->part_count - field_count;
+		field_end = data_end;
+		bsize += null_count * mp_sizeof_nil();
+	}
+	assert(field_end - field_start <= data_end - data);
 	bsize += field_end - field_start;
 
-	assert(!data_end || (field_end - field_start <= data_end - data));
-	(void) data_end;
-
 	char *key = (char *) region_alloc(&fiber()->gc, bsize);
 	if (key == NULL) {
 		diag_set(OutOfMemory, bsize, "region",
@@ -33,6 +56,10 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
 	}
 	char *key_buf = mp_encode_array(key, key_def->part_count);
 	memcpy(key_buf, field_start, field_end - field_start);
+	if (has_optional_parts && null_count > 0) {
+		key_buf += field_end - field_start;
+		memset(key_buf, MSGPACK_NULL, null_count);
+	}
 
 	if (key_size != NULL)
 		*key_size = bsize;
@@ -43,54 +70,78 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
  * Optimized version of tuple_extract_key() for sequential key defs
  * @copydoc tuple_extract_key()
  */
+template <bool has_optional_parts>
 static inline char *
 tuple_extract_key_sequential(const struct tuple *tuple,
 			     const struct key_def *key_def,
 			     uint32_t *key_size)
 {
 	assert(key_def_is_sequential(key_def));
+	assert(!has_optional_parts || key_def->is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
 	const char *data = tuple_data(tuple);
-	return tuple_extract_key_sequential_raw(data, NULL, key_def, key_size);
+	const char *data_end = data + tuple->bsize;
+	return tuple_extract_key_sequential_raw<has_optional_parts>(data,
+								    data_end,
+								    key_def,
+								    key_size);
 }
 
 /**
  * General-purpose implementation of tuple_extract_key()
  * @copydoc tuple_extract_key()
  */
-template <bool contains_sequential_parts>
+template <bool contains_sequential_parts, bool has_optional_parts>
 static char *
 tuple_extract_key_slowpath(const struct tuple *tuple,
 			   const struct key_def *key_def, uint32_t *key_size)
 {
+	assert(!has_optional_parts || key_def->is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
+	assert(contains_sequential_parts ==
+	       key_def_contains_sequential_parts(key_def));
+	assert(mp_sizeof_nil() == 1);
 	const char *data = tuple_data(tuple);
 	uint32_t part_count = key_def->part_count;
 	uint32_t bsize = mp_sizeof_array(part_count);
 	const struct tuple_format *format = tuple_format(tuple);
 	const uint32_t *field_map = tuple_field_map(tuple);
+	const char *tuple_end = data + tuple->bsize;
 
 	/* Calculate the key size. */
 	for (uint32_t i = 0; i < part_count; ++i) {
 		const char *field =
 			tuple_field_raw(format, data, field_map,
 					key_def->parts[i].fieldno);
+		if (has_optional_parts && field == NULL) {
+			bsize += mp_sizeof_nil();
+			continue;
+		}
+		assert(field != NULL);
 		const char *end = field;
 		if (contains_sequential_parts) {
 			/*
 			 * Skip sequential part in order to
 			 * minimize tuple_field_raw() calls.
 			 */
-			for (; i < key_def->part_count - 1; i++) {
+			for (; i < part_count - 1; i++) {
 				if (key_def->parts[i].fieldno + 1 !=
-					key_def->parts[i + 1].fieldno) {
+				    key_def->parts[i + 1].fieldno) {
 					/*
 					 * End of sequential part.
 					 */
 					break;
 				}
-				mp_next(&end);
+				if (!has_optional_parts || end < tuple_end)
+					mp_next(&end);
+				else
+					bsize += mp_sizeof_nil();
 			}
 		}
-		mp_next(&end);
+		if (!has_optional_parts || end < tuple_end)
+			mp_next(&end);
+		else
+			bsize += mp_sizeof_nil();
 		bsize += end - field;
 	}
 
@@ -104,27 +155,42 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
 		const char *field =
 			tuple_field_raw(format, data, field_map,
 					key_def->parts[i].fieldno);
+		if (has_optional_parts && field == NULL) {
+			key_buf = mp_encode_nil(key_buf);
+			continue;
+		}
 		const char *end = field;
+		uint32_t null_count = 0;
 		if (contains_sequential_parts) {
 			/*
 			 * Skip sequential part in order to
 			 * minimize tuple_field_raw() calls.
 			 */
-			for (; i < key_def->part_count - 1; i++) {
+			for (; i < part_count - 1; i++) {
 				if (key_def->parts[i].fieldno + 1 !=
-					key_def->parts[i + 1].fieldno) {
+				    key_def->parts[i + 1].fieldno) {
 					/*
 					 * End of sequential part.
 					 */
 					break;
 				}
-				mp_next(&end);
+				if (!has_optional_parts || end < tuple_end)
+					mp_next(&end);
+				else
+					++null_count;
 			}
 		}
-		mp_next(&end);
+		if (!has_optional_parts || end < tuple_end)
+			mp_next(&end);
+		else
+			++null_count;
 		bsize = end - field;
 		memcpy(key_buf, field, bsize);
 		key_buf += bsize;
+		if (has_optional_parts && null_count != 0) {
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+		}
 	}
 	if (key_size != NULL)
 		*key_size = key_buf - key;
@@ -135,11 +201,15 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
  * General-purpose version of tuple_extract_key_raw()
  * @copydoc tuple_extract_key_raw()
  */
+template <bool has_optional_parts>
 static char *
 tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			       const struct key_def *key_def,
 			       uint32_t *key_size)
 {
+	assert(!has_optional_parts || key_def->is_nullable);
+	assert(has_optional_parts == key_def->has_optional_parts);
+	assert(mp_sizeof_nil() == 1);
 	/* allocate buffer with maximal possible size */
 	char *key = (char *) region_alloc(&fiber()->gc, data_end - data);
 	if (key == NULL) {
@@ -149,7 +219,12 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	}
 	char *key_buf = mp_encode_array(key, key_def->part_count);
 	const char *field0 = data;
-	mp_decode_array(&field0);
+	uint32_t field_count = mp_decode_array(&field0);
+	/*
+	 * A tuple can not be empty - at least a pk always exists.
+	 */
+	assert(field_count > 0);
+	(void) field_count;
 	const char *field0_end = field0;
 	mp_next(&field0_end);
 	const char *field = field0;
@@ -157,11 +232,14 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	uint32_t current_fieldno = 0;
 	for (uint32_t i = 0; i < key_def->part_count; i++) {
 		uint32_t fieldno = key_def->parts[i].fieldno;
+		uint32_t null_count = 0;
 		for (; i < key_def->part_count - 1; i++) {
 			if (key_def->parts[i].fieldno + 1 !=
 			    key_def->parts[i + 1].fieldno)
 				break;
 		}
+		uint32_t end_fieldno = key_def->parts[i].fieldno;
+
 		if (fieldno < current_fieldno) {
 			/* Rewind. */
 			field = field0;
@@ -169,6 +247,19 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			current_fieldno = 0;
 		}
 
+		/*
+		 * First fieldno in a key columns can be out of
+		 * tuple size for nullable indexes because of
+		 * absense of indexed fields. Treat such fields
+		 * as NULLs.
+		 */
+		if (has_optional_parts && fieldno >= field_count) {
+			/* Nullify entire columns range. */
+			null_count = fieldno - end_fieldno + 1;
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+			continue;
+		}
 		while (current_fieldno < fieldno) {
 			/* search first field of key in tuple raw data */
 			field = field_end;
@@ -176,31 +267,33 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			current_fieldno++;
 		}
 
-		while (current_fieldno < key_def->parts[i].fieldno) {
-			/* search the last field in subsequence */
-			mp_next(&field_end);
-			current_fieldno++;
+		/*
+		 * If the last fieldno is out of tuple size, then
+		 * fill rest of columns with NULLs.
+		 */
+		if (has_optional_parts && end_fieldno >= field_count) {
+			null_count = end_fieldno - field_count + 1;
+			field_end = data_end;
+		} else {
+			while (current_fieldno < end_fieldno) {
+				mp_next(&field_end);
+				current_fieldno++;
+			}
 		}
 		memcpy(key_buf, field, field_end - field);
 		key_buf += field_end - field;
-		assert(key_buf - key <= data_end - data);
+		if (has_optional_parts && null_count != 0) {
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+		} else {
+			assert(key_buf - key <= data_end - data);
+		}
 	}
 	if (key_size != NULL)
 		*key_size = (uint32_t)(key_buf - key);
 	return key;
 }
 
-/** True, if a key con contain two or more parts in sequence. */
-static bool
-key_def_contains_sequential_parts(struct key_def *def)
-{
-	for (uint32_t i = 0; i < def->part_count - 1; ++i) {
-		if (def->parts[i].fieldno + 1 == def->parts[i + 1].fieldno)
-			return true;
-	}
-	return false;
-}
-
 /**
  * Initialize tuple_extract_key() and tuple_extract_key_raw()
  */
@@ -208,16 +301,45 @@ void
 tuple_extract_key_set(struct key_def *key_def)
 {
 	if (key_def_is_sequential(key_def)) {
-		key_def->tuple_extract_key = tuple_extract_key_sequential;
-		key_def->tuple_extract_key_raw = tuple_extract_key_sequential_raw;
-	} else {
-		if (key_def_contains_sequential_parts(key_def)) {
+		if (key_def->has_optional_parts) {
+			assert(key_def->is_nullable);
 			key_def->tuple_extract_key =
-				tuple_extract_key_slowpath<true>;
+				tuple_extract_key_sequential<true>;
+			key_def->tuple_extract_key_raw =
+				tuple_extract_key_sequential_raw<true>;
 		} else {
 			key_def->tuple_extract_key =
-				tuple_extract_key_slowpath<false>;
+				tuple_extract_key_sequential<false>;
+			key_def->tuple_extract_key_raw =
+				tuple_extract_key_sequential_raw<false>;
 		}
-		key_def->tuple_extract_key_raw = tuple_extract_key_slowpath_raw;
+	} else {
+		if (key_def->has_optional_parts) {
+			assert(key_def->is_nullable);
+			if (key_def_contains_sequential_parts(key_def)) {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<true, true>;
+			} else {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<false, true>;
+			}
+		} else {
+			if (key_def_contains_sequential_parts(key_def)) {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<true, false>;
+			} else {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<false,
+								   false>;
+			}
+		}
+	}
+	if (key_def->has_optional_parts) {
+		assert(key_def->is_nullable);
+		key_def->tuple_extract_key_raw =
+			tuple_extract_key_slowpath_raw<true>;
+	} else {
+		key_def->tuple_extract_key_raw =
+			tuple_extract_key_slowpath_raw<false>;
 	}
 }
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index e42fc039ed..e458f49a0d 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -49,6 +49,9 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys,
 		    uint16_t key_count, const struct field_def *fields,
 		    uint32_t field_count)
 {
+	format->min_field_count =
+		tuple_format_min_field_count(keys, key_count, fields,
+					     field_count);
 	if (format->field_count == 0) {
 		format->field_map_size = 0;
 		return 0;
@@ -59,8 +62,6 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys,
 		format->fields[i].type = fields[i].type;
 		format->fields[i].offset_slot = TUPLE_OFFSET_SLOT_NIL;
 		format->fields[i].is_nullable = fields[i].is_nullable;
-		if (i + 1 > format->min_field_count && !fields[i].is_nullable)
-			format->min_field_count = i + 1;
 	}
 	/* Initialize remaining fields */
 	for (uint32_t i = field_count; i < format->field_count; i++)
@@ -236,7 +237,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
 	format->field_count = field_count;
 	format->index_field_count = index_field_count;
 	format->exact_field_count = 0;
-	format->min_field_count = index_field_count;
+	format->min_field_count = 0;
 	return format;
 }
 
@@ -400,6 +401,14 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	++field;
 	uint32_t i = 1;
 	uint32_t defined_field_count = MIN(field_count, format->field_count);
+	if (field_count < format->index_field_count) {
+		/*
+		 * Nullify field map to be able to detect by 0,
+		 * which key fields are absent in tuple_field().
+		 */
+		memset((char *)field_map - format->field_map_size, 0,
+		       format->field_map_size);
+	}
 	for (; i < defined_field_count; ++i, ++field) {
 		mp_type = mp_typeof(*pos);
 		if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
@@ -415,6 +424,28 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	return 0;
 }
 
+uint32_t
+tuple_format_min_field_count(struct key_def * const *keys, uint16_t key_count,
+			     const struct field_def *space_fields,
+			     uint32_t space_field_count)
+{
+	uint32_t min_field_count = 0;
+	for (uint32_t i = 0; i < space_field_count; ++i) {
+		if (! space_fields[i].is_nullable)
+			min_field_count = i + 1;
+	}
+	for (uint32_t i = 0; i < key_count; ++i) {
+		const struct key_def *kd = keys[i];
+		for (uint32_t j = 0; j < kd->part_count; ++j) {
+			const struct key_part *kp = &kd->parts[j];
+			if (!kp->is_nullable &&
+			    kp->fieldno + 1 > min_field_count)
+				min_field_count = kp->fieldno + 1;
+		}
+	}
+	return min_field_count;
+}
+
 /** Destroy tuple format subsystem and free resourses */
 void
 tuple_format_free()
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index 77c8e404f3..d35182df7d 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -245,6 +245,21 @@ tuple_format_meta_size(const struct tuple_format *format)
 	return format->extra_size + format->field_map_size;
 }
 
+/**
+ * Calculate minimal field count of tuples with specified keys and
+ * space format.
+ * @param keys Array of key definitions of indexes.
+ * @param key_count Length of @a keys.
+ * @param space_fields Array of fields from a space format.
+ * @param space_field_count Length of @a space_fields.
+ *
+ * @retval Minimal field count.
+ */
+uint32_t
+tuple_format_min_field_count(struct key_def * const *keys, uint16_t key_count,
+			     const struct field_def *space_fields,
+			     uint32_t space_field_count);
+
 typedef struct tuple_format box_tuple_format_t;
 
 /** \cond public */
@@ -313,7 +328,7 @@ static inline const char *
 tuple_field_raw(const struct tuple_format *format, const char *tuple,
 		const uint32_t *field_map, uint32_t field_no)
 {
-	if (likely(field_no < format->field_count)) {
+	if (likely(field_no < format->index_field_count)) {
 		/* Indexed field */
 
 		if (field_no == 0) {
@@ -322,8 +337,12 @@ tuple_field_raw(const struct tuple_format *format, const char *tuple,
 		}
 
 		int32_t offset_slot = format->fields[field_no].offset_slot;
-		if (offset_slot != TUPLE_OFFSET_SLOT_NIL)
-			return tuple + field_map[offset_slot];
+		if (offset_slot != TUPLE_OFFSET_SLOT_NIL) {
+			if (field_map[offset_slot] != 0)
+				return tuple + field_map[offset_slot];
+			else
+				return NULL;
+		}
 	}
 	ERROR_INJECT(ERRINJ_TUPLE_FIELD, return NULL);
 	uint32_t field_count = mp_decode_array(&tuple);
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index e08bc5ff56..0f7ba91d09 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -212,6 +212,7 @@ static const hasher_signature hash_arr[] = {
 
 #undef HASHER
 
+template <bool has_optional_parts>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def);
 
@@ -254,7 +255,10 @@ tuple_hash_func_set(struct key_def *key_def) {
 	}
 
 slowpath:
-	key_def->tuple_hash = tuple_hash_slowpath;
+	if (key_def->has_optional_parts)
+		key_def->tuple_hash = tuple_hash_slowpath<true>;
+	else
+		key_def->tuple_hash = tuple_hash_slowpath<false>;
 	key_def->key_hash = key_hash_slowpath;
 }
 
@@ -295,17 +299,32 @@ tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
 	return size;
 }
 
+static inline uint32_t
+tuple_hash_null(uint32_t *ph1, uint32_t *pcarry)
+{
+	assert(mp_sizeof_nil() == 1);
+	const char null = 0xc0;
+	PMurHash32_Process(ph1, pcarry, &null, 1);
+	return mp_sizeof_nil();
+}
 
+template <bool has_optional_parts>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 {
+	assert(has_optional_parts == key_def->has_optional_parts);
 	uint32_t h = HASH_SEED;
 	uint32_t carry = 0;
 	uint32_t total_size = 0;
 	uint32_t prev_fieldno = key_def->parts[0].fieldno;
-	const char* field = tuple_field(tuple, key_def->parts[0].fieldno);
-	total_size += tuple_hash_field(&h, &carry, &field,
-				       key_def->parts[0].coll);
+	const char *field = tuple_field(tuple, key_def->parts[0].fieldno);
+	const char *end = (char *)tuple + tuple_size(tuple);
+	if (has_optional_parts && field == NULL) {
+		total_size += tuple_hash_null(&h, &carry);
+	} else {
+		total_size += tuple_hash_field(&h, &carry, &field,
+					       key_def->parts[0].coll);
+	}
 	for (uint32_t part_id = 1; part_id < key_def->part_count; part_id++) {
 		/* If parts of key_def are not sequential we need to call
 		 * tuple_field. Otherwise, tuple is hashed sequentially without
@@ -314,8 +333,13 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 		if (prev_fieldno + 1 != key_def->parts[part_id].fieldno) {
 			field = tuple_field(tuple, key_def->parts[part_id].fieldno);
 		}
-		total_size += tuple_hash_field(&h, &carry, &field,
-					       key_def->parts[part_id].coll);
+		if (has_optional_parts && (field == NULL || field >= end)) {
+			total_size += tuple_hash_null(&h, &carry);
+		} else {
+			total_size +=
+				tuple_hash_field(&h, &carry, &field,
+						 key_def->parts[part_id].coll);
+		}
 		prev_fieldno = key_def->parts[part_id].fieldno;
 	}
 
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 5e38a424dd..84182e7626 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -360,7 +360,7 @@ vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
 	assert(part_count <= field_count);
 	uint32_t nulls_count = field_count - cmp_def->part_count;
 	uint32_t bsize = mp_sizeof_array(field_count) +
-		mp_sizeof_nil() * nulls_count;
+			 mp_sizeof_nil() * nulls_count;
 	for (uint32_t i = 0; i < part_count; ++i) {
 		const struct key_part *part = &cmp_def->parts[i];
 		assert(part->fieldno < field_count);
@@ -421,14 +421,25 @@ vy_stmt_new_surrogate_delete(struct tuple_format *format,
 
 	const char *src_pos = src_data;
 	uint32_t src_count = mp_decode_array(&src_pos);
-	uint32_t field_count = format->index_field_count;
-	assert(src_count >= field_count);
-	(void) src_count;
+	assert(src_count >= format->min_field_count);
+	uint32_t field_count;
+	if (src_count < format->index_field_count) {
+		field_count = src_count;
+		/*
+		 * Nullify field map to be able to detect by 0,
+		 * which key fields are absent in tuple_field().
+		 */
+		memset((char *)field_map - format->field_map_size, 0,
+		       format->field_map_size);
+	} else {
+		field_count = format->index_field_count;
+	}
 	char *pos = mp_encode_array(data, field_count);
 	for (uint32_t i = 0; i < field_count; ++i) {
 		const struct tuple_field *field = &format->fields[i];
 		if (! field->is_key_part) {
-			/* Unindexed field - write NIL */
+			/* Unindexed field - write NIL. */
+			assert(i < src_count);
 			pos = mp_encode_nil(pos);
 			mp_next(&src_pos);
 			continue;
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 201c62487d..a33739d659 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -687,7 +687,7 @@ vy_tuple_key_contains_null(const struct tuple *tuple, const struct key_def *def)
 {
 	for (uint32_t i = 0; i < def->part_count; ++i) {
 		const char *field = tuple_field(tuple, def->parts[i].fieldno);
-		if (mp_typeof(*field) == MP_NIL)
+		if (field == NULL || mp_typeof(*field) == MP_NIL)
 			return true;
 	}
 	return false;
diff --git a/test/engine/null.result b/test/engine/null.result
index 5c3d57ff04..4abf850213 100644
--- a/test/engine/null.result
+++ b/test/engine/null.result
@@ -871,3 +871,692 @@ sk:select{}
 s:drop()
 ---
 ...
+--
+-- gh-2988: allow absense of tail nullable indexed fields.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
+s:replace{} -- Fail
+---
+- error: Tuple field count 0 is less than required by space format or defined indexes
+    (expected at least 1)
+...
+-- Compare full vs not full.
+s:replace{2}
+---
+- [2]
+...
+s:replace{1, 2}
+---
+- [1, 2]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+...
+sk:select{box.NULL}
+---
+- - [2]
+...
+sk:select{2}
+---
+- - [1, 2]
+...
+-- Compare not full vs full.
+s:replace{4, 5}
+---
+- [4, 5]
+...
+s:replace{3}
+---
+- [3]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+...
+sk:select{box.NULL}
+---
+- - [2]
+  - [3]
+...
+sk:select{5}
+---
+- - [4, 5]
+...
+-- Compare extended keys.
+s:replace{7}
+---
+- [7]
+...
+s:replace{6}
+---
+- [6]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+  - [6]
+  - [7]
+...
+sk:select{box.NULL}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+...
+sk:select{}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+  - [1, 2]
+  - [4, 5]
+...
+-- Test tuple extract key during dump for vinyl.
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+  - [1, 2]
+  - [4, 5]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+  - [6]
+  - [7]
+...
+-- Test tuple_compare_sequential_nullable,
+-- tuple_compare_with_key_sequential.
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+parts = {}
+---
+...
+parts[1] = {1, 'unsigned'}
+---
+...
+parts[2] = {2, 'unsigned', is_nullable = true}
+---
+...
+parts[3] = {3, 'unsigned', is_nullable = true}
+---
+...
+sk = s:create_index('sk', {parts = parts})
+---
+...
+-- Compare full vs not full.
+s:replace{1, 2, 3}
+---
+- [1, 2, 3]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{2, 3}
+---
+- [2, 3]
+...
+sk:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+sk:select{3, box.NULL}
+---
+- - [3]
+...
+sk:select{3, box.NULL, box.NULL}
+---
+- - [3]
+...
+sk:select{2}
+---
+- - [2, 3]
+...
+sk:select{2, 3}
+---
+- - [2, 3]
+...
+sk:select{3, 100}
+---
+- []
+...
+sk:select{3, box.NULL, 100}
+---
+- []
+...
+sk:select({3, box.NULL}, {iterator = 'GE'})
+---
+- - [3]
+...
+sk:select({3, box.NULL}, {iterator = 'LE'})
+---
+- - [3]
+  - [2, 3]
+  - [1, 2, 3]
+...
+s:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+-- Test tuple extract key for vinyl.
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+sk:select{3, box.NULL}
+---
+- - [3]
+...
+sk:select{3, box.NULL, box.NULL}
+---
+- - [3]
+...
+sk:select{2}
+---
+- - [2, 3]
+...
+sk:select{2, 3}
+---
+- - [2, 3]
+...
+sk:select{3, 100}
+---
+- []
+...
+sk:select{3, box.NULL, 100}
+---
+- []
+...
+sk:select({3, box.NULL}, {iterator = 'GE'})
+---
+- - [3]
+...
+sk:select({3, box.NULL}, {iterator = 'LE'})
+---
+- - [3]
+  - [2, 3]
+  - [1, 2, 3]
+...
+-- Test a tuple_compare_sequential() for a case, when there are
+-- two equal tuples, but in one of them field count < unique field
+-- count.
+s:replace{1, box.NULL}
+---
+- [1, null]
+...
+s:replace{1, box.NULL, box.NULL}
+---
+- [1, null, null]
+...
+s:select{1}
+---
+- - [1, null, null]
+...
+--
+-- Partially sequential keys. See tuple_extract_key.cc and
+-- contains_sequential_parts template flag.
+--
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+parts = {}
+---
+...
+parts[1] = {2, 'unsigned', is_nullable = true}
+---
+...
+parts[2] = {3, 'unsigned', is_nullable = true}
+---
+...
+parts[3] = {5, 'unsigned', is_nullable = true}
+---
+...
+parts[4] = {6, 'unsigned', is_nullable = true}
+---
+...
+parts[5] = {4, 'unsigned', is_nullable = true}
+---
+...
+parts[6] = {7, 'unsigned', is_nullable = true}
+---
+...
+sk = s:create_index('sk', {parts = parts})
+---
+...
+s:insert{1, 1, 1, 1, 1, 1, 1}
+---
+- [1, 1, 1, 1, 1, 1, 1]
+...
+s:insert{8, 1, 1, 1, 1, box.NULL}
+---
+- [8, 1, 1, 1, 1, null]
+...
+s:insert{9, 1, 1, 1, box.NULL}
+---
+- [9, 1, 1, 1, null]
+...
+s:insert{6, 6}
+---
+- [6, 6]
+...
+s:insert{10, 6, box.NULL}
+---
+- [10, 6, null]
+...
+s:insert{2, 2, 2, 2, 2, 2}
+---
+- [2, 2, 2, 2, 2, 2]
+...
+s:insert{7}
+---
+- [7]
+...
+s:insert{5, 5, 5}
+---
+- [5, 5, 5]
+...
+s:insert{3, 5, box.NULL, box.NULL, box.NULL}
+---
+- [3, 5, null, null, null]
+...
+s:insert{4, 5, 5, 5, box.NULL}
+---
+- [4, 5, 5, 5, null]
+...
+s:insert{11, 4, 4, 4}
+---
+- [11, 4, 4, 4]
+...
+s:insert{12, 4, box.NULL, 4}
+---
+- [12, 4, null, 4]
+...
+s:insert{13, 3, 3, 3, 3}
+---
+- [13, 3, 3, 3, 3]
+...
+s:insert{14, box.NULL, 3, box.NULL, 3}
+---
+- [14, null, 3, null, 3]
+...
+s:select{}
+---
+- - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [3, 5, null, null, null]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [6, 6]
+  - [7]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [10, 6, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [14, null, 3, null, 3]
+...
+sk:select{}
+---
+- - [7]
+  - [14, null, 3, null, 3]
+  - [9, 1, 1, 1, null]
+  - [8, 1, 1, 1, 1, null]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [13, 3, 3, 3, 3]
+  - [12, 4, null, 4]
+  - [11, 4, 4, 4]
+  - [3, 5, null, null, null]
+  - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+  - [6, 6]
+  - [10, 6, null]
+...
+sk:select{5, 5, box.NULL}
+---
+- - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+...
+sk:select{5, 5, box.NULL, 100}
+---
+- []
+...
+sk:select({7, box.NULL}, {iterator = 'LT'})
+---
+- - [10, 6, null]
+  - [6, 6]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [3, 5, null, null, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [2, 2, 2, 2, 2, 2]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [14, null, 3, null, 3]
+  - [7]
+...
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [7]
+  - [14, null, 3, null, 3]
+  - [9, 1, 1, 1, null]
+  - [8, 1, 1, 1, 1, null]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [13, 3, 3, 3, 3]
+  - [12, 4, null, 4]
+  - [11, 4, 4, 4]
+  - [3, 5, null, null, null]
+  - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+  - [6, 6]
+  - [10, 6, null]
+...
+sk:select{5, 5, box.NULL}
+---
+- - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+...
+sk:select{5, 5, box.NULL, 100}
+---
+- []
+...
+sk:select({7, box.NULL}, {iterator = 'LT'})
+---
+- - [10, 6, null]
+  - [6, 6]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [3, 5, null, null, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [2, 2, 2, 2, 2, 2]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [14, null, 3, null, 3]
+  - [7]
+...
+s:drop()
+---
+...
+--
+-- The main case of absent nullable fields - create an index over
+-- them on not empty space (available on memtx only).
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+s:replace{4}
+---
+- [4]
+...
+s:replace{5, 6}
+---
+- [5, 6]
+...
+s:replace{7, 8}
+---
+- [7, 8]
+...
+s:replace{9, box.NULL}
+---
+- [9, null]
+...
+s:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5, 6]
+  - [7, 8]
+  - [9, null]
+...
+sk:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [9, null]
+  - [5, 6]
+  - [7, 8]
+...
+sk:select{box.NULL}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [9, null]
+...
+s:drop()
+---
+...
+--
+-- The complex case: when an index part is_nullable is set to,
+-- false and it changes min_field_count, this part must become
+-- optional and turn on comparators for optional fields. See the
+-- big comment in alter.cc in index_def_new_from_tuple().
+--
+s = box.schema.create_space('test', {engine = 'memtx'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk = s:create_index('sk', {parts = {2, 'unsigned'}})
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+s:replace{2, box.NULL}
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
+...
+s:select{}
+---
+- - [1, 1]
+...
+sk:alter({parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+s:replace{20, box.NULL}
+---
+- [20, null]
+...
+sk:select{}
+---
+- - [20, null]
+  - [1, 1]
+...
+s:replace{10}
+---
+- [10]
+...
+sk:select{}
+---
+- - [10]
+  - [20, null]
+  - [1, 1]
+...
+s:replace{40}
+---
+- [40]
+...
+sk:select{}
+---
+- - [10]
+  - [20, null]
+  - [40]
+  - [1, 1]
+...
+s:drop()
+---
+...
+--
+-- Check that if an index alter makes a field be optional, and
+-- this field is used in another index, then this another index
+-- is updated too. Case of @locker.
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+i1 = s:create_index('i1', {parts = {2, 'unsigned', 3, 'unsigned'}})
+---
+...
+i2 = s:create_index('i2', {parts = {3, 'unsigned', 2, 'unsigned'}})
+---
+...
+i1:alter{parts = {{2, 'unsigned'}, {3, 'unsigned', is_nullable = true}}}
+---
+...
+-- i2 alter makes i1 contain optional part. Its key_def and
+-- comparators must be updated.
+i2:alter{parts = {{3, 'unsigned', is_nullable = true}, {2, 'unsigned'}}}
+---
+...
+s:insert{1, 1}
+---
+- [1, 1]
+...
+s:insert{100, 100}
+---
+- [100, 100]
+...
+s:insert{50, 50}
+---
+- [50, 50]
+...
+s:insert{25, 25, 25}
+---
+- [25, 25, 25]
+...
+s:insert{75, 75, 75}
+---
+- [75, 75, 75]
+...
+s:select{}
+---
+- - [1, 1]
+  - [25, 25, 25]
+  - [50, 50]
+  - [75, 75, 75]
+  - [100, 100]
+...
+i1:select{}
+---
+- - [1, 1]
+  - [25, 25, 25]
+  - [50, 50]
+  - [75, 75, 75]
+  - [100, 100]
+...
+i2:select{}
+---
+- - [1, 1]
+  - [50, 50]
+  - [100, 100]
+  - [25, 25, 25]
+  - [75, 75, 75]
+...
+i2:select{box.NULL, 50}
+---
+- - [50, 50]
+...
+i2:select{}
+---
+- - [1, 1]
+  - [50, 50]
+  - [100, 100]
+  - [25, 25, 25]
+  - [75, 75, 75]
+...
+s:drop()
+---
+...
diff --git a/test/engine/null.test.lua b/test/engine/null.test.lua
index 777a847f0d..7f5a7dd332 100644
--- a/test/engine/null.test.lua
+++ b/test/engine/null.test.lua
@@ -277,3 +277,187 @@ s:replace{10, box.NULL}
 s:replace{150, box.NULL}
 sk:select{}
 s:drop()
+
+--
+-- gh-2988: allow absense of tail nullable indexed fields.
+--
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+
+-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
+
+s:replace{} -- Fail
+-- Compare full vs not full.
+s:replace{2}
+s:replace{1, 2}
+s:select{}
+sk:select{box.NULL}
+sk:select{2}
+-- Compare not full vs full.
+s:replace{4, 5}
+s:replace{3}
+s:select{}
+sk:select{box.NULL}
+sk:select{5}
+-- Compare extended keys.
+s:replace{7}
+s:replace{6}
+s:select{}
+sk:select{box.NULL}
+sk:select{}
+-- Test tuple extract key during dump for vinyl.
+box.snapshot()
+sk:select{}
+s:select{}
+
+-- Test tuple_compare_sequential_nullable,
+-- tuple_compare_with_key_sequential.
+s:drop()
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+parts = {}
+parts[1] = {1, 'unsigned'}
+parts[2] = {2, 'unsigned', is_nullable = true}
+parts[3] = {3, 'unsigned', is_nullable = true}
+sk = s:create_index('sk', {parts = parts})
+-- Compare full vs not full.
+s:replace{1, 2, 3}
+s:replace{3}
+s:replace{2, 3}
+sk:select{}
+sk:select{3, box.NULL}
+sk:select{3, box.NULL, box.NULL}
+sk:select{2}
+sk:select{2, 3}
+sk:select{3, 100}
+sk:select{3, box.NULL, 100}
+sk:select({3, box.NULL}, {iterator = 'GE'})
+sk:select({3, box.NULL}, {iterator = 'LE'})
+s:select{}
+-- Test tuple extract key for vinyl.
+box.snapshot()
+sk:select{}
+sk:select{3, box.NULL}
+sk:select{3, box.NULL, box.NULL}
+sk:select{2}
+sk:select{2, 3}
+sk:select{3, 100}
+sk:select{3, box.NULL, 100}
+sk:select({3, box.NULL}, {iterator = 'GE'})
+sk:select({3, box.NULL}, {iterator = 'LE'})
+
+-- Test a tuple_compare_sequential() for a case, when there are
+-- two equal tuples, but in one of them field count < unique field
+-- count.
+s:replace{1, box.NULL}
+s:replace{1, box.NULL, box.NULL}
+s:select{1}
+
+--
+-- Partially sequential keys. See tuple_extract_key.cc and
+-- contains_sequential_parts template flag.
+--
+s:drop()
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+parts = {}
+parts[1] = {2, 'unsigned', is_nullable = true}
+parts[2] = {3, 'unsigned', is_nullable = true}
+parts[3] = {5, 'unsigned', is_nullable = true}
+parts[4] = {6, 'unsigned', is_nullable = true}
+parts[5] = {4, 'unsigned', is_nullable = true}
+parts[6] = {7, 'unsigned', is_nullable = true}
+sk = s:create_index('sk', {parts = parts})
+s:insert{1, 1, 1, 1, 1, 1, 1}
+s:insert{8, 1, 1, 1, 1, box.NULL}
+s:insert{9, 1, 1, 1, box.NULL}
+s:insert{6, 6}
+s:insert{10, 6, box.NULL}
+s:insert{2, 2, 2, 2, 2, 2}
+s:insert{7}
+s:insert{5, 5, 5}
+s:insert{3, 5, box.NULL, box.NULL, box.NULL}
+s:insert{4, 5, 5, 5, box.NULL}
+s:insert{11, 4, 4, 4}
+s:insert{12, 4, box.NULL, 4}
+s:insert{13, 3, 3, 3, 3}
+s:insert{14, box.NULL, 3, box.NULL, 3}
+s:select{}
+sk:select{}
+sk:select{5, 5, box.NULL}
+sk:select{5, 5, box.NULL, 100}
+sk:select({7, box.NULL}, {iterator = 'LT'})
+box.snapshot()
+sk:select{}
+sk:select{5, 5, box.NULL}
+sk:select{5, 5, box.NULL, 100}
+sk:select({7, box.NULL}, {iterator = 'LT'})
+
+s:drop()
+
+--
+-- The main case of absent nullable fields - create an index over
+-- them on not empty space (available on memtx only).
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+s:replace{4}
+s:replace{5, 6}
+s:replace{7, 8}
+s:replace{9, box.NULL}
+s:select{}
+sk:select{}
+sk:select{box.NULL}
+s:drop()
+
+--
+-- The complex case: when an index part is_nullable is set to,
+-- false and it changes min_field_count, this part must become
+-- optional and turn on comparators for optional fields. See the
+-- big comment in alter.cc in index_def_new_from_tuple().
+--
+s = box.schema.create_space('test', {engine = 'memtx'})
+pk = s:create_index('pk')
+sk = s:create_index('sk', {parts = {2, 'unsigned'}})
+s:replace{1, 1}
+s:replace{2, box.NULL}
+s:select{}
+sk:alter({parts = {{2, 'unsigned', is_nullable = true}}})
+s:replace{20, box.NULL}
+sk:select{}
+s:replace{10}
+sk:select{}
+s:replace{40}
+sk:select{}
+s:drop()
+
+--
+-- Check that if an index alter makes a field be optional, and
+-- this field is used in another index, then this another index
+-- is updated too. Case of @locker.
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+_ = s:create_index('pk')
+i1 = s:create_index('i1', {parts = {2, 'unsigned', 3, 'unsigned'}})
+i2 = s:create_index('i2', {parts = {3, 'unsigned', 2, 'unsigned'}})
+
+i1:alter{parts = {{2, 'unsigned'}, {3, 'unsigned', is_nullable = true}}}
+-- i2 alter makes i1 contain optional part. Its key_def and
+-- comparators must be updated.
+i2:alter{parts = {{3, 'unsigned', is_nullable = true}, {2, 'unsigned'}}}
+s:insert{1, 1}
+s:insert{100, 100}
+s:insert{50, 50}
+s:insert{25, 25, 25}
+s:insert{75, 75, 75}
+s:select{}
+i1:select{}
+i2:select{}
+i2:select{box.NULL, 50}
+i2:select{}
+s:drop()
-- 
GitLab