From 2c7c3e44152ba16665393c95c7726951a4a121a4 Mon Sep 17 00:00:00 2001
From: Kirill Shcherbatov <kshcherbatov@tarantool.org>
Date: Tue, 18 Jun 2019 18:52:51 +0300
Subject: [PATCH] box: introduce key_def->is_multikey flag

Previously only key definitions that have JSON paths were able
to define multikey index. We used to check multikey_path != NULL
test to determine whether given key definition is multikey.
In further patches with functional indexes this rule becomes
outdated. Functional index extracted key definition may be
multikey, but has no JSON paths.
So an explicit is_multikey flag was introduced.

Needed for #1260
---
 src/box/index_def.c          |  2 +-
 src/box/key_def.c            |  2 ++
 src/box/key_def.h            | 12 ++++--------
 src/box/memtx_bitset.c       |  2 +-
 src/box/memtx_engine.c       |  3 +--
 src/box/memtx_rtree.c        |  2 +-
 src/box/memtx_space.c        |  6 +++---
 src/box/memtx_tree.c         |  4 ++--
 src/box/tuple.c              |  2 +-
 src/box/tuple_bloom.c        |  4 ++--
 src/box/tuple_compare.cc     | 22 +++++++++++-----------
 src/box/tuple_extract_key.cc | 12 ++++++------
 src/box/tuple_hash.cc        |  6 +++---
 src/box/vinyl.c              |  5 ++---
 src/box/vy_stmt.h            |  6 +++---
 15 files changed, 43 insertions(+), 47 deletions(-)

diff --git a/src/box/index_def.c b/src/box/index_def.c
index 28de89274e..eb309a30c6 100644
--- a/src/box/index_def.c
+++ b/src/box/index_def.c
@@ -291,7 +291,7 @@ index_def_is_valid(struct index_def *index_def, const char *space_name)
 			 space_name, "too many key parts");
 		return false;
 	}
-	if (index_def->iid == 0 && key_def_is_multikey(index_def->key_def)) {
+	if (index_def->iid == 0 && index_def->key_def->is_multikey) {
 		diag_set(ClientError, ER_MODIFY_INDEX, index_def->name,
 			 space_name, "primary key cannot be multikey");
 		return false;
diff --git a/src/box/key_def.c b/src/box/key_def.c
index eebfb7fe4c..2aa095091e 100644
--- a/src/box/key_def.c
+++ b/src/box/key_def.c
@@ -181,6 +181,7 @@ key_def_set_part_path(struct key_def *def, uint32_t part_no, const char *path,
 		def->multikey_path = part->path;
 		def->multikey_fieldno = part->fieldno;
 		def->multikey_path_len = (uint32_t) multikey_path_len;
+		def->is_multikey = true;
 	} else if (def->multikey_fieldno != part->fieldno ||
 		   json_path_cmp(path, multikey_path_len, def->multikey_path,
 				 def->multikey_path_len,
@@ -752,6 +753,7 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 	new_def->is_nullable = first->is_nullable || second->is_nullable;
 	new_def->has_optional_parts = first->has_optional_parts ||
 				      second->has_optional_parts;
+	new_def->is_multikey = first->is_multikey || second->is_multikey;
 
 	/* JSON paths data in the new key_def. */
 	char *path_pool = (char *)new_def + key_def_sizeof(new_part_count, 0);
diff --git a/src/box/key_def.h b/src/box/key_def.h
index f4a1a8fd1e..ab4b7c0878 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -196,6 +196,8 @@ struct key_def {
 	bool is_nullable;
 	/** True if some key part has JSON path. */
 	bool has_json_paths;
+	/** True if it is multikey key definition. */
+	bool is_multikey;
 	/**
 	 * True, if some key parts can be absent in a tuple. These
 	 * fields assumed to be MP_NIL.
@@ -220,14 +222,14 @@ struct key_def {
 	const char *multikey_path;
 	/**
 	 * The length of the key_def::multikey_path.
-	 * Valid when key_def_is_multikey(key_def) is true,
+	 * Valid when key_def->is_multikey is true,
 	 * undefined otherwise.
 	 */
 	uint32_t multikey_path_len;
 	/**
 	 * The index of the root field of the multikey JSON
 	 * path index key_def::multikey_path.
-	 * Valid when key_def_is_multikey(key_def) is true,
+	 * Valid when key_def->is_multikey is true,
 	 * undefined otherwise.
 	*/
 	uint32_t multikey_fieldno;
@@ -469,12 +471,6 @@ key_def_is_sequential(const struct key_def *key_def)
 	return true;
 }
 
-static inline bool
-key_def_is_multikey(const struct key_def *key_def)
-{
-	return key_def->multikey_path != NULL;
-}
-
 /**
  * Return true if @a key_def defines has fields that requires
  * special collation comparison.
diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 8c626684ce..59bc96427b 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -276,7 +276,7 @@ memtx_bitset_index_replace(struct index *base, struct tuple *old_tuple,
 	struct memtx_bitset_index *index = (struct memtx_bitset_index *)base;
 
 	assert(!base->def->opts.is_unique);
-	assert(!key_def_is_multikey(base->def->key_def));
+	assert(!base->def->key_def->is_multikey);
 	assert(old_tuple != NULL || new_tuple != NULL);
 	(void) mode;
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 869cd343a4..428491c2d8 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1251,7 +1251,6 @@ memtx_index_def_change_requires_rebuild(struct index *index,
 				  TUPLE_INDEX_BASE) != 0)
 			return true;
 	}
-	assert(key_def_is_multikey(old_cmp_def) ==
-	       key_def_is_multikey(new_cmp_def));
+	assert(old_cmp_def->is_multikey == new_cmp_def->is_multikey);
 	return false;
 }
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index b91a405d76..8badad797b 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -120,7 +120,7 @@ extract_rectangle(struct rtree_rect *rect, struct tuple *tuple,
 		  struct index_def *index_def)
 {
 	assert(index_def->key_def->part_count == 1);
-	assert(!key_def_is_multikey(index_def->key_def));
+	assert(!index_def->key_def->is_multikey);
 	const char *elems = tuple_field_by_part(tuple,
 				index_def->key_def->parts, MULTIKEY_NONE);
 	unsigned dimension = index_def->opts.dimension;
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 09277b0295..a8e7b8080c 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -653,7 +653,7 @@ memtx_space_check_index_def(struct space *space, struct index_def *index_def)
 				 "HASH index must be unique");
 			return -1;
 		}
-		if (key_def_is_multikey(index_def->key_def)) {
+		if (index_def->key_def->is_multikey) {
 			diag_set(ClientError, ER_MODIFY_INDEX,
 				 index_def->name, space_name(space),
 				 "HASH index cannot be multikey");
@@ -682,7 +682,7 @@ memtx_space_check_index_def(struct space *space, struct index_def *index_def)
 				 "RTREE index field type must be ARRAY");
 			return -1;
 		}
-		if (key_def_is_multikey(index_def->key_def)) {
+		if (index_def->key_def->is_multikey) {
 			diag_set(ClientError, ER_MODIFY_INDEX,
 				 index_def->name, space_name(space),
 				 "RTREE index cannot be multikey");
@@ -710,7 +710,7 @@ memtx_space_check_index_def(struct space *space, struct index_def *index_def)
 				 "BITSET index field type must be NUM or STR");
 			return -1;
 		}
-		if (key_def_is_multikey(index_def->key_def)) {
+		if (index_def->key_def->is_multikey) {
 			diag_set(ClientError, ER_MODIFY_INDEX,
 				 index_def->name, space_name(space),
 				 "BITSET index cannot be multikey");
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 268ce4050f..3edf94044d 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -882,7 +882,7 @@ memtx_tree_index_end_build(struct index *base)
 	struct key_def *cmp_def = memtx_tree_cmp_def(&index->tree);
 	qsort_arg(index->build_array, index->build_array_size,
 		  sizeof(index->build_array[0]), memtx_tree_qcompare, cmp_def);
-	if (key_def_is_multikey(cmp_def)) {
+	if (cmp_def->is_multikey) {
 		/*
 		 * Multikey index may have equal(in terms of
 		 * cmp_def) keys inserted by different multikey
@@ -1027,7 +1027,7 @@ memtx_tree_index_new(struct memtx_engine *memtx, struct index_def *def)
 			 "malloc", "struct memtx_tree_index");
 		return NULL;
 	}
-	const struct index_vtab *vtab = key_def_is_multikey(def->key_def) ?
+	const struct index_vtab *vtab = def->key_def->is_multikey ?
 					&memtx_tree_index_multikey_vtab :
 					&memtx_tree_index_vtab;
 	if (index_create(&index->base, (struct engine *)memtx,
diff --git a/src/box/tuple.c b/src/box/tuple.c
index a7ef332b2f..c0e94d55b6 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -553,7 +553,7 @@ tuple_raw_multikey_count(struct tuple_format *format, const char *data,
 			       const uint32_t *field_map,
 			       struct key_def *key_def)
 {
-	assert(key_def_is_multikey(key_def));
+	assert(key_def->is_multikey);
 	const char *array_raw =
 		tuple_field_raw_by_path(format, data, field_map,
 					key_def->multikey_fieldno,
diff --git a/src/box/tuple_bloom.c b/src/box/tuple_bloom.c
index 8b74da22c8..420a7c60ea 100644
--- a/src/box/tuple_bloom.c
+++ b/src/box/tuple_bloom.c
@@ -109,7 +109,7 @@ tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
 			int multikey_idx)
 {
 	assert(builder->part_count == key_def->part_count);
-	assert(!key_def_is_multikey(key_def) || multikey_idx != MULTIKEY_NONE);
+	assert(!key_def->is_multikey || multikey_idx != MULTIKEY_NONE);
 
 	uint32_t h = HASH_SEED;
 	uint32_t carry = 0;
@@ -204,7 +204,7 @@ bool
 tuple_bloom_maybe_has(const struct tuple_bloom *bloom, struct tuple *tuple,
 		      struct key_def *key_def, int multikey_idx)
 {
-	assert(!key_def_is_multikey(key_def) || multikey_idx != MULTIKEY_NONE);
+	assert(!key_def->is_multikey || multikey_idx != MULTIKEY_NONE);
 
 	if (bloom->is_legacy) {
 		return bloom_maybe_has(&bloom->parts[0],
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index 95a0f58c94..c03b584d02 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -460,9 +460,9 @@ tuple_compare_slowpath(struct tuple *tuple_a, hint_t tuple_a_hint,
 	assert(!has_optional_parts || is_nullable);
 	assert(is_nullable == key_def->is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
-	assert(key_def_is_multikey(key_def) == is_multikey);
-	assert(!is_multikey || (is_multikey == has_json_paths &&
-		tuple_a_hint != HINT_NONE && tuple_b_hint != HINT_NONE));
+	assert(key_def->is_multikey == is_multikey);
+	assert(!is_multikey || (tuple_a_hint != HINT_NONE &&
+		tuple_b_hint != HINT_NONE));
 	int rc = 0;
 	if (!is_multikey && (rc = hint_cmp(tuple_a_hint, tuple_b_hint)) != 0)
 		return rc;
@@ -619,9 +619,9 @@ tuple_compare_with_key_slowpath(struct tuple *tuple, hint_t tuple_hint,
 	assert(has_optional_parts == key_def->has_optional_parts);
 	assert(key != NULL || part_count == 0);
 	assert(part_count <= key_def->part_count);
-	assert(key_def_is_multikey(key_def) == is_multikey);
-	assert(!is_multikey || (is_multikey == has_json_paths &&
-		tuple_hint != HINT_NONE && key_hint == HINT_NONE));
+	assert(key_def->is_multikey == is_multikey);
+	assert(!is_multikey || (tuple_hint != HINT_NONE &&
+		key_hint == HINT_NONE));
 	int rc = 0;
 	if (!is_multikey && (rc = hint_cmp(tuple_hint, key_hint)) != 0)
 		return rc;
@@ -1573,7 +1573,7 @@ template <enum field_type type, bool is_nullable>
 static hint_t
 key_hint(const char *key, uint32_t part_count, struct key_def *key_def)
 {
-	assert(!key_def_is_multikey(key_def));
+	assert(!key_def->is_multikey);
 	if (part_count == 0)
 		return HINT_NONE;
 	return field_hint<type, is_nullable>(key, key_def->parts->coll);
@@ -1583,7 +1583,7 @@ template <enum field_type type, bool is_nullable>
 static hint_t
 tuple_hint(struct tuple *tuple, struct key_def *key_def)
 {
-	assert(!key_def_is_multikey(key_def));
+	assert(!key_def->is_multikey);
 	const char *field = tuple_field_by_part(tuple, key_def->parts,
 						MULTIKEY_NONE);
 	if (is_nullable && field == NULL)
@@ -1607,7 +1607,7 @@ key_hint_multikey(const char *key, uint32_t part_count, struct key_def *key_def)
 	 * do nothing on key hint calculation an it is valid
 	 * because it is never used(unlike tuple hint).
 	 */
-	assert(key_def_is_multikey(key_def));
+	assert(key_def->is_multikey);
 	return HINT_NONE;
 }
 
@@ -1641,7 +1641,7 @@ key_def_set_hint_func(struct key_def *def)
 static void
 key_def_set_hint_func(struct key_def *def)
 {
-	if (key_def_is_multikey(def)) {
+	if (def->is_multikey) {
 		def->key_hint = key_hint_multikey;
 		def->tuple_hint = tuple_hint_multikey;
 		return;
@@ -1756,7 +1756,7 @@ static void
 key_def_set_compare_func_json(struct key_def *def)
 {
 	assert(def->has_json_paths);
-	if (key_def_is_multikey(def)) {
+	if (def->is_multikey) {
 		def->tuple_compare = tuple_compare_slowpath
 				<is_nullable, has_optional_parts, true, true>;
 		def->tuple_compare_with_key = tuple_compare_with_key_slowpath
diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
index 3bd3cde70b..471c7df808 100644
--- a/src/box/tuple_extract_key.cc
+++ b/src/box/tuple_extract_key.cc
@@ -118,8 +118,8 @@ tuple_extract_key_slowpath(struct tuple *tuple, struct key_def *key_def,
 	assert(has_optional_parts == key_def->has_optional_parts);
 	assert(contains_sequential_parts ==
 	       key_def_contains_sequential_parts(key_def));
-	assert(is_multikey == key_def_is_multikey(key_def));
-	assert(!key_def_is_multikey(key_def) || multikey_idx != MULTIKEY_NONE);
+	assert(is_multikey == key_def->is_multikey);
+	assert(!key_def->is_multikey || multikey_idx != MULTIKEY_NONE);
 	assert(mp_sizeof_nil() == 1);
 	const char *data = tuple_data(tuple);
 	uint32_t part_count = key_def->part_count;
@@ -250,7 +250,7 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	assert(has_json_paths == key_def->has_json_paths);
 	assert(!has_optional_parts || key_def->is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
-	assert(!key_def_is_multikey(key_def) || multikey_idx != MULTIKEY_NONE);
+	assert(!key_def->is_multikey || multikey_idx != MULTIKEY_NONE);
 	assert(mp_sizeof_nil() == 1);
 	/* allocate buffer with maximal possible size */
 	char *key = (char *) region_alloc(&fiber()->gc, data_end - data);
@@ -366,7 +366,7 @@ static void
 key_def_set_extract_func_plain(struct key_def *def)
 {
 	assert(!def->has_json_paths);
-	assert(!key_def_is_multikey(def));
+	assert(!def->is_multikey);
 	if (key_def_is_sequential(def)) {
 		assert(contains_sequential_parts || def->part_count == 1);
 		def->tuple_extract_key = tuple_extract_key_sequential
@@ -387,7 +387,7 @@ static void
 key_def_set_extract_func_json(struct key_def *def)
 {
 	assert(def->has_json_paths);
-	if (key_def_is_multikey(def)) {
+	if (def->is_multikey) {
 		def->tuple_extract_key = tuple_extract_key_slowpath
 					<contains_sequential_parts,
 					 has_optional_parts, true, true>;
@@ -452,7 +452,7 @@ tuple_key_contains_null(struct tuple *tuple, struct key_def *def,
 int
 tuple_validate_key_parts(struct key_def *key_def, struct tuple *tuple)
 {
-	assert(!key_def_is_multikey(key_def));
+	assert(!key_def->is_multikey);
 	for (uint32_t idx = 0; idx < key_def->part_count; idx++) {
 		struct key_part *part = &key_def->parts[idx];
 		const char *field = tuple_field_by_part(tuple, part,
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index 223c5f6f45..780e3d0536 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -155,7 +155,7 @@ struct TupleHash
 {
 	static uint32_t hash(struct tuple *tuple, struct key_def *key_def)
 	{
-		assert(!key_def_is_multikey(key_def));
+		assert(!key_def->is_multikey);
 		uint32_t h = HASH_SEED;
 		uint32_t carry = 0;
 		uint32_t total_size = 0;
@@ -172,7 +172,7 @@ template <>
 struct TupleHash<FIELD_TYPE_UNSIGNED> {
 	static uint32_t	hash(struct tuple *tuple, struct key_def *key_def)
 	{
-		assert(!key_def_is_multikey(key_def));
+		assert(!key_def->is_multikey);
 		const char *field = tuple_field_by_part(tuple,
 						key_def->parts,
 						MULTIKEY_NONE);
@@ -364,7 +364,7 @@ tuple_hash_slowpath(struct tuple *tuple, struct key_def *key_def)
 {
 	assert(has_json_paths == key_def->has_json_paths);
 	assert(has_optional_parts == key_def->has_optional_parts);
-	assert(!key_def_is_multikey(key_def));
+	assert(!key_def->is_multikey);
 	uint32_t h = HASH_SEED;
 	uint32_t carry = 0;
 	uint32_t total_size = 0;
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 5c0114ac67..f68958ba19 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1019,8 +1019,7 @@ vinyl_index_def_change_requires_rebuild(struct index *index,
 				  TUPLE_INDEX_BASE) != 0)
 			return true;
 	}
-	assert(key_def_is_multikey(old_cmp_def) ==
-	       key_def_is_multikey(new_cmp_def));
+	assert(old_cmp_def->is_multikey == new_cmp_def->is_multikey);
 	return false;
 }
 
@@ -1643,7 +1642,7 @@ vy_check_is_unique_secondary(struct vy_tx *tx, const struct vy_read_view **rv,
 {
 	if (!lsm->check_is_unique)
 		return 0;
-	if (!key_def_is_multikey(lsm->cmp_def)) {
+	if (!lsm->cmp_def->is_multikey) {
 		return vy_check_is_unique_secondary_one(tx, rv,
 				space_name, index_name, lsm, stmt,
 				MULTIKEY_NONE);
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index a80456add0..25219230d0 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -696,7 +696,7 @@ vy_stmt_str(struct tuple *stmt);
 static inline int
 vy_entry_multikey_idx(struct vy_entry entry, struct key_def *key_def)
 {
-	if (!key_def_is_multikey(key_def) || vy_stmt_is_key(entry.stmt))
+	if (!key_def->is_multikey || vy_stmt_is_key(entry.stmt))
 		return MULTIKEY_NONE;
 	assert(entry.hint != HINT_NONE);
 	return (int)entry.hint;
@@ -766,11 +766,11 @@ vy_entry_compare_with_raw_key(struct vy_entry entry,
  */
 #define vy_stmt_foreach_entry(entry, src_stmt, key_def)			\
 	for (uint32_t multikey_idx = 0,					\
-	     multikey_count = !key_def_is_multikey((key_def)) ? 1 :	\
+	     multikey_count = !(key_def)->is_multikey ? 1 :		\
 			tuple_multikey_count((src_stmt), (key_def));	\
 	     multikey_idx < multikey_count &&				\
 	     (((entry).stmt = (src_stmt)),				\
-	      ((entry).hint = !key_def_is_multikey((key_def)) ?		\
+	      ((entry).hint = !(key_def)->is_multikey ?			\
 			vy_stmt_hint((src_stmt), (key_def)) :		\
 			multikey_idx), true);				\
 	     ++multikey_idx)
-- 
GitLab