From 6cd463e1d851a256207d47038268fafea4a83061 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov@tarantool.org>
Date: Fri, 8 Jul 2022 14:03:41 +0300
Subject: [PATCH] memtx: allocate functional index key parts as tuples

Functional index keys are allocated and freed with MemtxAllocator's
alloc and free methods. In contrast to tuples, which are allocated and
freed with alloc_tuple and free_tuple, freeing a functional index key
happens immediately, irrespective of whether there's a snapshot in
progress or not. It's acceptable, because snapshot only uses primary
indexes, which can't be functional. However, to reuse the snapshot
infrastructure for creating general purpose user read views, we will
need to guarantee that functional index keys stay alive until all read
views using them are closed.

To achieve that, this commit turns functional index keys into tuples,
which automatically makes them linger if there's an open read view.
We use the same global tuple format for allocating functional keys,
because the key format is checked in key_list_iterator_next.

Closes #7376

NO_DOC=refactoring
NO_TEST=refactoring
NO_CHANGELOG=refactoring
---
 src/box/key_list.c        |  28 ++++++----
 src/box/key_list.h        |  24 +++------
 src/box/memtx_engine.cc   |   9 ++++
 src/box/memtx_engine.h    |   6 ++-
 src/box/memtx_tree.cc     | 110 +++++++++++++-------------------------
 src/box/tuple_compare.cc  |   6 +--
 test/wal_off/alter.result |   2 +-
 7 files changed, 81 insertions(+), 104 deletions(-)

diff --git a/src/box/key_list.c b/src/box/key_list.c
index 4779d2dd9b..25de879ca7 100644
--- a/src/box/key_list.c
+++ b/src/box/key_list.c
@@ -40,16 +40,17 @@
 #include "port.h"
 #include "schema.h"
 #include "tt_static.h"
+#include "tuple.h"
 
 int
 key_list_iterator_create(struct key_list_iterator *it, struct tuple *tuple,
 			 struct index_def *index_def, bool validate,
-			 key_list_allocator_t key_allocator)
+			 struct tuple_format *format)
 {
 	it->index_def = index_def;
 	it->validate = validate;
 	it->tuple = tuple;
-	it->key_allocator = key_allocator;
+	it->format = format;
 
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
@@ -116,7 +117,7 @@ key_list_iterator_create(struct key_list_iterator *it, struct tuple *tuple,
 }
 
 int
-key_list_iterator_next(struct key_list_iterator *it, const char **value)
+key_list_iterator_next(struct key_list_iterator *it, struct tuple **value)
 {
 	assert(it->data <= it->data_end);
 	if (it->data == it->data_end) {
@@ -124,15 +125,16 @@ key_list_iterator_next(struct key_list_iterator *it, const char **value)
 		return 0;
 	}
 	const char *key = it->data;
+	const char *key_end;
 	if (!it->validate) {
 		/*
 		 * Valid key is a MP_ARRAY, so just go to the
 		 * next key via mp_next().
 		 */
-		mp_next(&it->data);
-		assert(it->data <= it->data_end);
-		*value = it->key_allocator(key, it->data - key);
-		return *value != NULL ? 0 : -1;
+		key_end = it->data;
+		mp_next(&key_end);
+		assert(key_end <= it->data_end);
+		goto out;
 	}
 
 	if (mp_typeof(*key) != MP_ARRAY) {
@@ -162,7 +164,6 @@ key_list_iterator_next(struct key_list_iterator *it, const char **value)
 				   key_def->part_count, part_count));
 		return -1;
 	}
-	const char *key_end;
 	if (key_validate_parts(key_def, rptr, part_count, true,
 			       &key_end) != 0) {
 		struct space *space = space_by_id(it->index_def->space_id);
@@ -175,8 +176,13 @@ key_list_iterator_next(struct key_list_iterator *it, const char **value)
 			 "key does not follow functional index definition");
 		return -1;
 	}
-
+out:
 	it->data = key_end;
-	*value = it->key_allocator(key, key_end - key);
-	return *value != NULL ? 0 : -1;
+	*value = tuple_new(it->format, key, key_end);
+	if (*value == NULL)
+		return -1;
+	if (tuple_has_flag(it->tuple, TUPLE_IS_TEMPORARY))
+		tuple_set_flag(*value, TUPLE_IS_TEMPORARY);
+	tuple_bless(*value);
+	return 0;
 }
diff --git a/src/box/key_list.h b/src/box/key_list.h
index 12e6938a23..9df3103aad 100644
--- a/src/box/key_list.h
+++ b/src/box/key_list.h
@@ -39,17 +39,7 @@ extern "C" {
 
 struct index_def;
 struct tuple;
-
-/**
- * We iterate over a functional index key list in two cases: when
- * adding a new tuple or removing the old one. When adding a new
- * tuple we need to copy functional index data to tuple memory, so
- * provide an allocator for that. When removing an old index, it's
- * fine to use a temporary memory area for the functional index
- * key, since the key is only used to lookup the old tuple in the
- * b+* tree, so we pass in a dummy allocator.
- */
-typedef const char *(*key_list_allocator_t)(const char *key, uint32_t key_sz);
+struct tuple_format;
 
 /**
  * An iterator over key_data returned by a stored function function.
@@ -76,8 +66,8 @@ struct key_list_iterator {
 	const char *data_end;
 	/** Whether the iterator must validate processed keys. */
 	bool validate;
-	/** Allocate a key copy before returning it. */
-	key_list_allocator_t key_allocator;
+	/** Format used for allocating keys. Not referenced. */
+	struct tuple_format *format;
 };
 
 /**
@@ -86,6 +76,7 @@ struct key_list_iterator {
  * Executes a function specified in the given functional index
  * definition and initializes a new iterator over the MsgPack
  * array with keys. Each key is a nested MsgPack array.
+ * Keys are allocated using the supplied tuple format.
  *
  * When validate flag is set, each array entry is validated
  * to match the given functional index key definition.
@@ -97,17 +88,18 @@ struct key_list_iterator {
 int
 key_list_iterator_create(struct key_list_iterator *it, struct tuple *tuple,
 			 struct index_def *index_def, bool validate,
-			 key_list_allocator_t key_allocator);
+			 struct tuple_format *format);
 
 /**
  * Return the next key and advance the iterator state.
- * If the iterator is exhausted, the value is set to 0.
+ * If the iterator is exhausted, the value is set to NULL.
+ * The new tuple is pinned with tuple_bless().
  *
  * @retval 0 on success or EOF.
  * @retval -1 on error; diag is set.
  */
 int
-key_list_iterator_next(struct key_list_iterator *it, const char **value);
+key_list_iterator_next(struct key_list_iterator *it, struct tuple **value);
 
 #ifdef __cplusplus
 } /* extern "C" */
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index 7276662ba8..d8ce201437 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -238,6 +238,7 @@ memtx_engine_shutdown(struct engine *engine)
 	tuple_arena_destroy(&memtx->arena);
 
 	xdir_destroy(&memtx->snap_dir);
+	tuple_format_unref(memtx->func_key_format);
 	free(memtx);
 }
 
@@ -1363,6 +1364,14 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery,
 	memtx->base.vtab = &memtx_engine_vtab;
 	memtx->base.name = "memtx";
 
+	memtx->func_key_format = simple_tuple_format_new(
+		&memtx_tuple_format_vtab, &memtx->base, NULL, 0);
+	if (memtx->func_key_format == NULL) {
+		diag_log();
+		panic("failed to create functional index key format");
+	}
+	tuple_format_ref(memtx->func_key_format);
+
 	fiber_start(memtx->gc_fiber, memtx);
 	return memtx;
 fail:
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index 41c1a65c74..f8735ea42c 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -106,7 +106,7 @@ enum memtx_reserve_extents_num {
  * allocated for each iterator (except rtree index iterator that
  * is significantly bigger so has own pool).
  */
-#define MEMTX_ITERATOR_SIZE (192)
+#define MEMTX_ITERATOR_SIZE (160)
 
 struct memtx_engine {
 	struct engine base;
@@ -169,6 +169,10 @@ struct memtx_engine {
 	 * memtx_gc_task::link.
 	 */
 	struct stailq gc_queue;
+	/**
+	 * Format used for allocating functional index keys.
+	 */
+	struct tuple_format *func_key_format;
 };
 
 struct memtx_gc_task;
diff --git a/src/box/memtx_tree.cc b/src/box/memtx_tree.cc
index 01fa7badb1..1ec614f6cf 100644
--- a/src/box/memtx_tree.cc
+++ b/src/box/memtx_tree.cc
@@ -254,16 +254,14 @@ struct tree_iterator {
 	struct memtx_tree_key_data<USE_HINT> key_data;
 	struct memtx_tree_data<USE_HINT> current;
 	/**
-	 * For functional indexes only: copy of the functional index key at the
-	 * current iterator position. Allocated from current_func_key_buf or on
-	 * malloc.
+	 * For functional indexes only: reference to the functional index key
+	 * at the current iterator position.
 	 *
 	 * Since pinning a tuple doesn't prevent its functional keys from being
-	 * deleted, we need to copy it so that we can use it to restore the
-	 * iterator position.
+	 * deleted, we need to reference the key so that we can use it to
+	 * restore the iterator position.
 	 */
-	void *current_func_key;
-	char current_func_key_buf[32];
+	struct tuple *current_func_key;
 	/** Memory pool the iterator was allocated from. */
 	struct mempool *pool;
 };
@@ -293,21 +291,12 @@ tree_iterator_set_current_hint(struct tree_iterator<USE_HINT> *it, hint_t hint)
 {
 	if (!USE_HINT)
 		return;
-	if (it->current_func_key != NULL &&
-	    it->current_func_key != it->current_func_key_buf) {
-		free(it->current_func_key);
-	}
+	if (it->current_func_key != NULL)
+		tuple_unref(it->current_func_key);
 	it->current_func_key = NULL;
 	if (hint != HINT_NONE && it->base.index->def->key_def->for_func_index) {
-		void *key = (void *)hint;
-		uint32_t key_sz = memtx_alloc_size(key);
-		if (key_sz <= sizeof(it->current_func_key_buf)) {
-			it->current_func_key = it->current_func_key_buf;
-		} else {
-			it->current_func_key = xmalloc(key_sz);
-		}
-		memcpy(it->current_func_key, key, key_sz);
-		hint = (hint_t)it->current_func_key;
+		it->current_func_key = (struct tuple *)hint;
+		tuple_ref(it->current_func_key);
 	}
 	it->current.set_hint(hint);
 }
@@ -1194,34 +1183,6 @@ memtx_tree_index_replace_multikey(struct index *base, struct tuple *old_tuple,
 	return 0;
 }
 
-/** A dummy key allocator used when removing tuples from an index. */
-static const char *
-func_index_key_dummy_alloc(const char *key, uint32_t key_sz)
-{
-	(void) key_sz;
-	return key;
-}
-
-/** Allocator used for allocating functional index key parts. */
-static const char *
-func_index_key_alloc(const char *key, uint32_t key_sz)
-{
-	void *ptr = memtx_alloc(key_sz);
-	if (ptr == NULL) {
-		diag_set(OutOfMemory, key_sz, "MemtxAllocator::alloc",
-			 "functional index key part");
-		return NULL;
-	}
-	memcpy(ptr, key, key_sz);
-	return (const char *)ptr;
-}
-
-static void
-func_index_key_free(const char *key)
-{
-	memtx_free((void *)key);
-}
-
 /**
  * An undo entry for multikey functional index replace operation.
  * Used to roll back a failed insert/replace and restore the
@@ -1262,7 +1223,7 @@ memtx_tree_func_index_replace_rollback(struct memtx_tree_index<true> *index,
 	struct func_key_undo *entry;
 	rlist_foreach_entry(entry, new_keys, link) {
 		memtx_tree_delete_value(&index->tree, entry->key, NULL);
-		func_index_key_free((const char *)entry->key.hint);
+		tuple_unref((struct tuple *)entry->key.hint);
 	}
 	rlist_foreach_entry(entry, old_keys, link)
 		memtx_tree_insert(&index->tree, entry->key, NULL, NULL);
@@ -1288,6 +1249,7 @@ memtx_tree_func_index_replace(struct index *base, struct tuple *old_tuple,
 	/* FUNC doesn't support successor for now. */
 	*successor = NULL;
 
+	struct memtx_engine *memtx = (struct memtx_engine *)base->engine;
 	struct memtx_tree_index<true> *index =
 		(struct memtx_tree_index<true> *)base;
 	struct index_def *index_def = index->base.def;
@@ -1304,20 +1266,20 @@ memtx_tree_func_index_replace(struct index *base, struct tuple *old_tuple,
 		rlist_create(&old_keys);
 		rlist_create(&new_keys);
 		if (key_list_iterator_create(&it, new_tuple, index_def, true,
-					     func_index_key_alloc) != 0)
+					     memtx->func_key_format) != 0)
 			goto end;
 		int err = 0;
-		const char *key;
+		struct tuple *key;
 		struct func_key_undo *undo;
 		while ((err = key_list_iterator_next(&it, &key)) == 0 &&
 			key != NULL) {
 			/* Perform insertion, log it in list. */
 			undo = func_key_undo_new(region);
 			if (undo == NULL) {
-				func_index_key_free(key);
 				err = -1;
 				break;
 			}
+			tuple_ref(key);
 			undo->key.tuple = new_tuple;
 			undo->key.hint = (hint_t)key;
 			rlist_add(&new_keys, &undo->link);
@@ -1353,8 +1315,7 @@ memtx_tree_func_index_replace(struct index *base, struct tuple *old_tuple,
 				 * Remove the replaced tuple undo
 				 * from undo list.
 				 */
-				func_index_key_free(
-					(const char *)old_data.hint);
+				tuple_unref((struct tuple *)old_data.hint);
 				rlist_foreach_entry(undo, &new_keys, link) {
 					if (undo->key.hint == old_data.hint) {
 						rlist_del(&undo->link);
@@ -1377,16 +1338,21 @@ memtx_tree_func_index_replace(struct index *base, struct tuple *old_tuple,
 		 * replaced entries.
 		 */
 		rlist_foreach_entry(undo, &old_keys, link) {
-			func_index_key_free((const char *)undo->key.hint);
+			tuple_unref((struct tuple *)undo->key.hint);
 		}
 	}
 	if (old_tuple != NULL) {
+		/*
+		 * Use the runtime format to avoid OOM while deleting a tuple
+		 * from a space. It's okay, because we are not going to store
+		 * the keys in the index.
+		 */
 		if (key_list_iterator_create(&it, old_tuple, index_def, false,
-					     func_index_key_dummy_alloc) != 0)
+					     tuple_format_runtime) != 0)
 			goto end;
 		struct memtx_tree_data<true> data, deleted_data;
 		data.tuple = old_tuple;
-		const char *key;
+		struct tuple *key;
 		while (key_list_iterator_next(&it, &key) == 0 && key != NULL) {
 			data.hint = (hint_t) key;
 			deleted_data.tuple = NULL;
@@ -1397,8 +1363,7 @@ memtx_tree_func_index_replace(struct index *base, struct tuple *old_tuple,
 				 * Release related hint on
 				 * successful node deletion.
 				 */
-				func_index_key_free(
-					(const char *)deleted_data.hint);
+				tuple_unref((struct tuple *)deleted_data.hint);
 			}
 		}
 		assert(key == NULL);
@@ -1564,7 +1529,9 @@ memtx_tree_index_build_next_multikey(struct index *base, struct tuple *tuple)
 static int
 memtx_tree_func_index_build_next(struct index *base, struct tuple *tuple)
 {
-	struct memtx_tree_index<true> *index = (struct memtx_tree_index<true> *)base;
+	struct memtx_engine *memtx = (struct memtx_engine *)base->engine;
+	struct memtx_tree_index<true> *index =
+		(struct memtx_tree_index<true> *)base;
 	struct index_def *index_def = index->base.def;
 	assert(index_def->key_def->for_func_index);
 
@@ -1573,22 +1540,23 @@ memtx_tree_func_index_build_next(struct index *base, struct tuple *tuple)
 
 	struct key_list_iterator it;
 	if (key_list_iterator_create(&it, tuple, index_def, false,
-				     func_index_key_alloc) != 0)
+				     memtx->func_key_format) != 0)
 		return -1;
 
-	const char *key;
+	struct tuple *key;
 	uint32_t insert_idx = index->build_array_size;
 	while (key_list_iterator_next(&it, &key) == 0 && key != NULL) {
 		if (memtx_tree_index_build_array_append(index, tuple,
 							(hint_t)key) != 0)
 			goto error;
+		tuple_ref(key);
 	}
 	assert(key == NULL);
 	region_truncate(region, region_svp);
 	return 0;
 error:
 	for (uint32_t i = insert_idx; i < index->build_array_size; i++) {
-		func_index_key_free((const char *)index->build_array[i].hint);
+		tuple_unref((struct tuple *)index->build_array[i].hint);
 	}
 	region_truncate(region, region_svp);
 	return -1;
@@ -1601,8 +1569,8 @@ memtx_tree_func_index_build_next(struct index *base, struct tuple *tuple)
  */
 template <bool USE_HINT>
 static void
-memtx_tree_index_build_array_deduplicate(struct memtx_tree_index<USE_HINT> *index,
-			void (*destroy)(const char *hint))
+memtx_tree_index_build_array_deduplicate(
+	struct memtx_tree_index<USE_HINT> *index)
 {
 	if (index->build_array_size == 0)
 		return;
@@ -1624,11 +1592,12 @@ memtx_tree_index_build_array_deduplicate(struct memtx_tree_index<USE_HINT> *inde
 		}
 		r_idx++;
 	}
-	if (destroy != NULL) {
+	if (cmp_def->for_func_index) {
 		/* Destroy deduplicated entries. */
 		for (r_idx = w_idx + 1;
 		     r_idx < index->build_array_size; r_idx++) {
-			destroy((const char *)index->build_array[r_idx].hint);
+			hint_t hint = index->build_array[r_idx].hint;
+			tuple_unref((struct tuple *)hint);
 		}
 	}
 	index->build_array_size = w_idx + 1;
@@ -1644,7 +1613,7 @@ memtx_tree_index_end_build(struct index *base)
 	qsort_arg(index->build_array, index->build_array_size,
 		  sizeof(index->build_array[0]),
 		  memtx_tree_qcompare<USE_HINT>, cmp_def);
-	if (cmp_def->is_multikey) {
+	if (cmp_def->is_multikey || cmp_def->for_func_index) {
 		/*
 		 * Multikey index may have equal(in terms of
 		 * cmp_def) keys inserted by different multikey
@@ -1652,10 +1621,7 @@ memtx_tree_index_end_build(struct index *base)
 		 * the following memtx_tree_build assumes that
 		 * all keys are unique.
 		 */
-		memtx_tree_index_build_array_deduplicate<USE_HINT>(index, NULL);
-	} else if (cmp_def->for_func_index) {
-		memtx_tree_index_build_array_deduplicate<USE_HINT>(index,
-							 func_index_key_free);
+		memtx_tree_index_build_array_deduplicate<USE_HINT>(index);
 	}
 	memtx_tree_build(&index->tree, index->build_array,
 			 index->build_array_size);
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index 6f3b6f0c3b..3841ba5758 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -1411,8 +1411,8 @@ func_index_compare(struct tuple *tuple_a, hint_t tuple_a_hint,
 	assert(cmp_def->for_func_index);
 	assert(is_nullable == cmp_def->is_nullable);
 
-	const char *key_a = (const char *)tuple_a_hint;
-	const char *key_b = (const char *)tuple_b_hint;
+	const char *key_a = tuple_data((struct tuple *)tuple_a_hint);
+	const char *key_b = tuple_data((struct tuple *)tuple_b_hint);
 	assert(mp_typeof(*key_a) == MP_ARRAY);
 	uint32_t part_count_a = mp_decode_array(&key_a);
 	assert(mp_typeof(*key_b) == MP_ARRAY);
@@ -1469,7 +1469,7 @@ func_index_compare_with_key(struct tuple *tuple, hint_t tuple_hint,
 	(void)tuple; (void)key_hint;
 	assert(key_def->for_func_index);
 	assert(is_nullable == key_def->is_nullable);
-	const char *tuple_key = (const char *)tuple_hint;
+	const char *tuple_key = tuple_data((struct tuple *)tuple_hint);
 	assert(mp_typeof(*tuple_key) == MP_ARRAY);
 
 	uint32_t tuple_key_count = mp_decode_array(&tuple_key);
diff --git a/test/wal_off/alter.result b/test/wal_off/alter.result
index 97f7e6f190..5403c0dd73 100644
--- a/test/wal_off/alter.result
+++ b/test/wal_off/alter.result
@@ -28,7 +28,7 @@ end;
 ...
 #spaces;
 ---
-- 65501
+- 65500
 ...
 -- cleanup
 for k, v in pairs(spaces) do
-- 
GitLab