From 5026c087be09efd81399ba6764872f2fbd55c696 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja.osipov@gmail.com>
Date: Fri, 20 Jan 2012 14:29:48 +0400
Subject: [PATCH] tree-index-opt: make struct key_def an independent member of
 struct space

Code review fixes for branch tree-index-opt:
to avoid unnecessary copying and settle the issue of
ownership of key_def structures, move their ownership
to struct space.

Now Index uses its key_def by pointer, and all key_defs are
allocated and freed separately in struct space
allocation/deallocation functions.

Fix coding style (use Lisp/Emacs style function
call style, as per LKML which we follow).
---
 mod/box/box.h       |  17 +++--
 mod/box/box.m       |  68 +++++++++++--------
 mod/box/box_lua.m   |   6 +-
 mod/box/index.h     |  10 +--
 mod/box/index.m     |  41 ++++++------
 mod/box/memcached.m |  30 +++++----
 mod/box/tree.h      |   4 +-
 mod/box/tree.m      | 157 +++++++++++++++++++++-----------------------
 8 files changed, 175 insertions(+), 158 deletions(-)

diff --git a/mod/box/box.h b/mod/box/box.h
index 41582a3c87..443478afa1 100644
--- a/mod/box/box.h
+++ b/mod/box/box.h
@@ -43,15 +43,13 @@ enum
 
 struct space {
 	Index *index[BOX_INDEX_MAX];
+	/** Size of index array */
+	int key_count;
 	int n;
 	int cardinality;
 
-	/**
-	 * Inferred data: max field no which participates in an
-	 * index. Each tuple in this space must have, therefore, at
-	 * least indexed_field_count fields.
-	 */
-	int field_count;
+	/** Index metadata. */
+	struct key_def *key_defs;
 	/**
 	 * Field types of indexed fields. This is an array of size
 	 * indexed_field_count. If there are gaps, i.e. fields
@@ -61,6 +59,13 @@ struct space {
 	 * set for fields which types in two indexes contradict.
 	 */
 	enum field_data_type *field_types;
+	/**
+	 * Inferred data: max field no which participates in an
+	 * index. Each tuple in this space must have, therefore, at
+	 * least indexed_field_count fields.
+	 */
+	int field_count;
+
 	bool enabled;
 };
 
diff --git a/mod/box/box.m b/mod/box/box.m
index a030dee0e5..ecd4afb628 100644
--- a/mod/box/box.m
+++ b/mod/box/box.m
@@ -126,23 +126,23 @@ validate_indexes(struct box_txn *txn)
 	/* There is more than one index. */
 	foreach_index(txn->n, index) {
 		/* XXX: skip the first index here! */
-		for (u32 f = 0; f < index->key_def.part_count; ++f) {
-			if (index->key_def.parts[f].fieldno >= txn->tuple->cardinality)
+		for (u32 f = 0; f < index->key_def->part_count; ++f) {
+			if (index->key_def->parts[f].fieldno >= txn->tuple->cardinality)
 				tnt_raise(IllegalParams, :"tuple must have all indexed fields");
 
-			if (index->key_def.parts[f].type == STRING)
+			if (index->key_def->parts[f].type == STRING)
 				continue;
 
-			void *field = tuple_field(txn->tuple, index->key_def.parts[f].fieldno);
+			void *field = tuple_field(txn->tuple, index->key_def->parts[f].fieldno);
 			u32 len = load_varint32(&field);
 
-			if (index->key_def.parts[f].type == NUM && len != sizeof(u32))
+			if (index->key_def->parts[f].type == NUM && len != sizeof(u32))
 				tnt_raise(IllegalParams, :"field must be NUM");
 
-			if (index->key_def.parts[f].type == NUM64 && len != sizeof(u64))
+			if (index->key_def->parts[f].type == NUM64 && len != sizeof(u64))
 				tnt_raise(IllegalParams, :"field must be NUM64");
 		}
-		if (index->type == TREE && index->key_def.is_unique == false)
+		if (index->type == TREE && index->key_def->is_unique == false)
 			/* Don't check non unique indexes */
 			continue;
 
@@ -184,8 +184,8 @@ prepare_replace(struct box_txn *txn, size_t cardinality, struct tbuf *data)
 	if (txn->old_tuple != NULL) {
 #ifndef NDEBUG
 		void *ka, *kb;
-		ka = tuple_field(txn->tuple, txn->index->key_def.parts[0].fieldno);
-		kb = tuple_field(txn->old_tuple, txn->index->key_def.parts[0].fieldno);
+		ka = tuple_field(txn->tuple, txn->index->key_def->parts[0].fieldno);
+		kb = tuple_field(txn->old_tuple, txn->index->key_def->parts[0].fieldno);
 		int kal, kab;
 		kal = load_varint32(&ka);
 		kab = load_varint32(&kb);
@@ -919,6 +919,14 @@ xlog_print(struct recovery_state *r __attribute__((unused)), struct tbuf *t)
 	return res;
 }
 
+/** Free a key definition. */
+static void
+key_free(struct key_def *key_def)
+{
+	free(key_def->parts);
+	free(key_def->cmp_order);
+}
+
 void
 space_free(void)
 {
@@ -933,9 +941,11 @@ space_free(void)
 			if (index == nil)
 				break;
 			[index free];
+			key_free(&space[i].key_defs[j]);
 		}
 
-		sfree(space[i].field_types);
+		free(space[i].key_defs);
+		free(space[i].field_types);
 	}
 }
 
@@ -959,14 +969,14 @@ key_init(struct key_def *def, struct tarantool_cfg_space_index *cfg_index)
 	}
 
 	/* init def array */
-	def->parts = salloc(sizeof(struct key_part) * def->part_count);
+	def->parts = malloc(sizeof(struct key_part) * def->part_count);
 	if (def->parts == NULL) {
 		panic("can't allocate def parts array for index");
 	}
 
 	/* init compare order array */
 	def->max_fieldno++;
-	def->cmp_order = salloc(def->max_fieldno * sizeof(u32));
+	def->cmp_order = malloc(def->max_fieldno * sizeof(u32));
 	if (def->cmp_order == NULL) {
 		panic("can't allocate def cmp_order array for index");
 	}
@@ -998,9 +1008,11 @@ key_init(struct key_def *def, struct tarantool_cfg_space_index *cfg_index)
  * @param key_defs	key description array
  */
 static void
-extract_field_types(struct space *space, int key_count, struct key_def *key_defs)
+space_init_field_types(struct space *space)
 {
 	int i, field_count;
+	int key_count = space->key_count;
+	struct key_def *key_defs = space->key_defs;
 
 	/* find max max field no */
 	field_count = 0;
@@ -1010,7 +1022,7 @@ extract_field_types(struct space *space, int key_count, struct key_def *key_defs
 
 	/* alloc & init field type info */
 	space->field_count = field_count;
-	space->field_types = salloc(field_count * sizeof(int));
+	space->field_types = malloc(field_count * sizeof(enum field_data_type));
 	for (i = 0; i < field_count; i++) {
 		space->field_types[i] = UNKNOWN;
 	}
@@ -1061,35 +1073,35 @@ space_config(void)
 		space[i].n = i;
 		space[i].cardinality = cfg_space->cardinality;
 
-		/* count keys */
-		int key_count = 0;
+		/*
+		 * Collect key/field info. We need aggregate
+		 * information on all keys before we can create
+		 * indexes.
+		 */
+		space[i].key_count = 0;
 		for (int j = 0; cfg_space->index[j] != NULL; ++j) {
-			++key_count;
+			++space[i].key_count;
 		}
 
-		/* collect key/field info */
-		struct key_def *key_defs = malloc(key_count * sizeof(struct key_def));
-		if (key_defs == NULL) {
+		space[i].key_defs = malloc(space[i].key_count *
+					    sizeof(struct key_def));
+		if (space[i].key_defs == NULL) {
 			panic("can't allocate key def array");
 		}
 		for (int j = 0; cfg_space->index[j] != NULL; ++j) {
 			typeof(cfg_space->index[j]) cfg_index = cfg_space->index[j];
-			key_init(&key_defs[j], cfg_index);
+			key_init(&space[i].key_defs[j], cfg_index);
 		}
-		extract_field_types(&space[i], key_count, key_defs);
+		space_init_field_types(&space[i]);
 
 		/* fill space indexes */
 		for (int j = 0; cfg_space->index[j] != NULL; ++j) {
 			typeof(cfg_space->index[j]) cfg_index = cfg_space->index[j];
 			enum index_type type = STR2ENUM(index_type, cfg_index->type);
-			Index *index = [Index alloc: type :&key_defs[j] :&space[i]];
-			[index init: type :&key_defs[j] :&space[i] :j];
+			Index *index = [Index alloc: type :&space[i] :j];
+			[index init: type :&space[i] :j];
 			space[i].index[j] = index;
 		}
-
-		/* free temp data */
-		free(key_defs);
-
 		say_info("space %i successfully configured", i);
 	}
 }
diff --git a/mod/box/box_lua.m b/mod/box/box_lua.m
index b8a1364f50..02f30c317d 100644
--- a/mod/box/box_lua.m
+++ b/mod/box/box_lua.m
@@ -420,7 +420,7 @@ lbox_index_next(struct lua_State *L)
 			struct tbuf *data = tbuf_alloc(fiber->gc_pool);
 			for (int i = 0; i < argc; ++i)
 				append_key_part(L, i + 2, data,
-						index->key_def.parts[i].type);
+						index->key_def->parts[i].type);
 			key = data->data;
 		}
 		/*
@@ -429,10 +429,10 @@ lbox_index_next(struct lua_State *L)
 		 * keys.
 		*/
 		assert(cardinality != 0);
-		if (cardinality > index->key_def.part_count)
+		if (cardinality > index->key_def->part_count)
 			luaL_error(L, "index.next(): key part count (%d) "
 				   "does not match index cardinality (%d)",
-				   cardinality, index->key_def.part_count);
+				   cardinality, index->key_def->part_count);
 		it = [index allocIterator];
 		[index initIterator: it :key :cardinality];
 		lbox_pushiterator(L, it);
diff --git a/mod/box/index.h b/mod/box/index.h
index ccbeeba4e9..0ccd99bf11 100644
--- a/mod/box/index.h
+++ b/mod/box/index.h
@@ -85,23 +85,23 @@ struct key_def {
 	 */
 	struct iterator *position;
 	/* Description of a possibly multipart key. */
-	struct key_def key_def;
+	struct key_def *key_def;
 	enum index_type type;
 	bool enabled;
 	/* Relative offset of the index in its namespace. */
 	u32 n;
 };
 
-+ (Index *) alloc: (enum index_type) type :(struct key_def *) key_def
-	:(struct space *) space;
++ (Index *) alloc: (enum index_type) type :(struct space *) space
+	:(u32) n_arg;
 /**
  * Initialize index instance.
  *
  * @param space    space the index belongs to
  * @param key      key part description
  */
-- (id) init: (enum index_type) type_arg :(struct key_def *) key_def_arg
-	:(struct space *) space_arg :(u32) n_arg;
+- (id) init: (enum index_type) type_arg :(struct space *) space_arg
+	:(u32) n_arg;
 /** Destroy and free index instance. */
 - (void) free;
 /**
diff --git a/mod/box/index.m b/mod/box/index.m
index a358f8cc9a..112c913843 100644
--- a/mod/box/index.m
+++ b/mod/box/index.m
@@ -58,9 +58,10 @@ iterator_first_equal(struct iterator *it)
 @class HashStrIndex;
 @class TreeIndex;
 
-+ (Index *) alloc: (enum index_type) type :(struct key_def *) key_def
-	:(struct space *) space;
++ (Index *) alloc: (enum index_type) type :(struct space *) space
+	:(u32) n_arg;
 {
+	struct key_def *key_def = &space->key_defs[n_arg];
 	switch (type) {
 	case HASH:
 		/* Hash index, check key type.
@@ -78,18 +79,18 @@ iterator_first_equal(struct iterator *it)
 		}
 		break;
 	case TREE:
-		return [TreeIndex alloc: key_def :space];
+		return [TreeIndex alloc: space :n_arg];
 	default:
 		break;
 	}
 	panic("unsupported index type");
 }
 
-- (id) init: (enum index_type) type_arg :(struct key_def *) key_def_arg
-	:(struct space *) space_arg :(u32) n_arg;
+- (id) init: (enum index_type) type_arg :(struct space *) space_arg
+	:(u32) n_arg;
 {
 	self = [super init];
-	key_def = *key_def_arg;
+	key_def = &space_arg->key_defs[n_arg];
 	type = type_arg;
 	n = n_arg;
 	space = space_arg;
@@ -100,8 +101,6 @@ iterator_first_equal(struct iterator *it)
 
 - (void) free
 {
-	sfree(key_def.parts);
-	sfree(key_def.cmp_order);
 	position->free(position);
 	[super free];
 }
@@ -242,9 +241,10 @@ hash_iterator_free(struct iterator *iterator)
 - (struct box_tuple *) findByTuple: (struct box_tuple *) tuple
 {
 	/* Hash index currently is always single-part. */
-	void *field = tuple_field(tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(tuple, key_def->parts[0].fieldno);
 	if (field == NULL)
-		tnt_raise(ClientError, :ER_NO_SUCH_FIELD, key_def.parts[0].fieldno);
+		tnt_raise(ClientError, :ER_NO_SUCH_FIELD,
+			  key_def->parts[0].fieldno);
 	return [self find: field];
 }
 
@@ -307,7 +307,7 @@ hash_iterator_free(struct iterator *iterator)
 
 - (void) remove: (struct box_tuple *) tuple
 {
-	void *field = tuple_field(tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(tuple, key_def->parts[0].fieldno);
 	unsigned int field_size = load_varint32(&field);
 	u32 num = *(u32 *)field;
 
@@ -325,7 +325,7 @@ hash_iterator_free(struct iterator *iterator)
 - (void) replace: (struct box_tuple *) old_tuple
 	:(struct box_tuple *) new_tuple
 {
-	void *field = tuple_field(new_tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(new_tuple, key_def->parts[0].fieldno);
 	u32 field_size = load_varint32(&field);
 	u32 num = *(u32 *)field;
 
@@ -333,7 +333,8 @@ hash_iterator_free(struct iterator *iterator)
 		tnt_raise(IllegalParams, :"key is not u32");
 
 	if (old_tuple != NULL) {
-		void *old_field = tuple_field(old_tuple, key_def.parts[0].fieldno);
+		void *old_field = tuple_field(old_tuple,
+					      key_def->parts[0].fieldno);
 		load_varint32(&old_field);
 		u32 old_num = *(u32 *)old_field;
 		mh_int_t k = mh_i32ptr_get(int_hash, old_num);
@@ -428,7 +429,7 @@ hash_iterator_free(struct iterator *iterator)
 
 - (void) remove: (struct box_tuple *) tuple
 {
-	void *field = tuple_field(tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(tuple, key_def->parts[0].fieldno);
 	unsigned int field_size = load_varint32(&field);
 	u64 num = *(u64 *)field;
 
@@ -446,7 +447,7 @@ hash_iterator_free(struct iterator *iterator)
 - (void) replace: (struct box_tuple *) old_tuple
 	:(struct box_tuple *) new_tuple
 {
-	void *field = tuple_field(new_tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(new_tuple, key_def->parts[0].fieldno);
 	u32 field_size = load_varint32(&field);
 	u64 num = *(u64 *)field;
 
@@ -455,7 +456,7 @@ hash_iterator_free(struct iterator *iterator)
 
 	if (old_tuple != NULL) {
 		void *old_field = tuple_field(old_tuple,
-					      key_def.parts[0].fieldno);
+					      key_def->parts[0].fieldno);
 		load_varint32(&old_field);
 		u64 old_num = *(u64 *)old_field;
 		mh_int_t k = mh_i64ptr_get(int64_hash, old_num);
@@ -547,7 +548,7 @@ hash_iterator_free(struct iterator *iterator)
 
 - (void) remove: (struct box_tuple *) tuple
 {
-	void *field = tuple_field(tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(tuple, key_def->parts[0].fieldno);
 
 	mh_int_t k = mh_lstrptr_get(str_hash, field);
 	if (k != mh_end(str_hash))
@@ -562,15 +563,15 @@ hash_iterator_free(struct iterator *iterator)
 - (void) replace: (struct box_tuple *) old_tuple
 	:(struct box_tuple *) new_tuple
 {
-	void *field = tuple_field(new_tuple, key_def.parts[0].fieldno);
+	void *field = tuple_field(new_tuple, key_def->parts[0].fieldno);
 
 	if (field == NULL)
 		tnt_raise(ClientError, :ER_NO_SUCH_FIELD,
-			  key_def.parts[0].fieldno);
+			  key_def->parts[0].fieldno);
 
 	if (old_tuple != NULL) {
 		void *old_field = tuple_field(old_tuple,
-					      key_def.parts[0].fieldno);
+					      key_def->parts[0].fieldno);
 		mh_int_t k = mh_lstrptr_get(str_hash, old_field);
 		if (k != mh_end(str_hash))
 			mh_lstrptr_del(str_hash, k);
diff --git a/mod/box/memcached.m b/mod/box/memcached.m
index ff1e363c93..c79312df74 100644
--- a/mod/box/memcached.m
+++ b/mod/box/memcached.m
@@ -438,26 +438,32 @@ memcached_space_init()
 	memc_s->cardinality = 4;
 	memc_s->n = cfg.memcached_space;
 
-	struct key_def key_def;
+	memc_s->key_count = 1;
+	memc_s->key_defs = malloc(sizeof(struct key_def));
+
+	if (memc_s->key_defs == NULL)
+		panic("out of memory when configuring memcached_space");
+
+	struct key_def *key_def = memc_s->key_defs;
 	/* Configure memcached index key. */
-	key_def.part_count = 1;
-	key_def.is_unique = true;
+	key_def->part_count = 1;
+	key_def->is_unique = true;
 
-	key_def.parts = salloc(sizeof(struct key_part));
-	key_def.cmp_order = salloc(sizeof(u32));
+	key_def->parts = malloc(sizeof(struct key_part));
+	key_def->cmp_order = malloc(sizeof(u32));
 
-	if (key_def.parts == NULL || key_def.cmp_order == NULL)
+	if (key_def->parts == NULL || key_def->cmp_order == NULL)
 		panic("out of memory when configuring memcached_space");
 
-	key_def.parts[0].fieldno = 0;
-	key_def.parts[0].type = STRING;
+	key_def->parts[0].fieldno = 0;
+	key_def->parts[0].type = STRING;
 
-	key_def.max_fieldno = 1;
-	key_def.cmp_order[0] = 0;
+	key_def->max_fieldno = 1;
+	key_def->cmp_order[0] = 0;
 
 	/* Configure memcached index. */
-	Index *memc_index = memc_s->index[0] = [Index alloc: HASH :&key_def :space];
-	[memc_index init: HASH :&key_def :memc_s :0];
+	Index *memc_index = memc_s->index[0] = [Index alloc: HASH :memc_s :0];
+	[memc_index init: HASH :memc_s :0];
 }
 
 /** Delete a bunch of expired keys. */
diff --git a/mod/box/tree.h b/mod/box/tree.h
index f0ea087e82..57b1f989da 100644
--- a/mod/box/tree.h
+++ b/mod/box/tree.h
@@ -41,10 +41,10 @@ typedef int (*tree_cmp_t)(const void *, const void *, void *);
 	sptree_index tree;
 };
 
-+ (Index *) alloc: (struct key_def *) key_def :(struct space *) space;
++ (Index *) alloc: (struct space *) space :(u32) n_arg;
 - (void) build: (Index *) pk;
 
-/* to be defined in subclasses */
+/** To be defined in subclasses. */
 - (size_t) node_size;
 - (tree_cmp_t) node_cmp;
 - (tree_cmp_t) dup_node_cmp;
diff --git a/mod/box/tree.m b/mod/box/tree.m
index d1c886038f..2649bc3a1a 100644
--- a/mod/box/tree.m
+++ b/mod/box/tree.m
@@ -404,10 +404,9 @@ fold_with_num32_value(struct key_def *key_def, struct box_tuple *tuple)
  * Compare a part for two keys.
  */
 static int
-sparse_part_compare(
-	enum field_data_type type,
-	const u8 *data_a, union sparse_part part_a,
-	const u8 *data_b, union sparse_part part_b)
+sparse_part_compare(enum field_data_type type,
+		    const u8 *data_a, union sparse_part part_a,
+		    const u8 *data_b, union sparse_part part_b)
 {
 	if (type == NUM) {
 		return u32_cmp(part_a.num32, part_b.num32);
@@ -446,16 +445,16 @@ sparse_part_compare(
  * Compare a key for two sparse nodes.
  */
 static int
-sparse_node_compare(
-	struct key_def *key_def,
-	struct box_tuple *tuple_a, const union sparse_part* parts_a,
-	struct box_tuple *tuple_b, const union sparse_part* parts_b)
+sparse_node_compare(struct key_def *key_def,
+		    struct box_tuple *tuple_a,
+		    const union sparse_part* parts_a,
+		    struct box_tuple *tuple_b,
+		    const union sparse_part* parts_b)
 {
 	for (int part = 0; part < key_def->part_count; ++part) {
-		int r = sparse_part_compare(
-			key_def->parts[part].type,
-			tuple_a->data, parts_a[part],
-			tuple_b->data, parts_b[part]);
+		int r = sparse_part_compare(key_def->parts[part].type,
+					    tuple_a->data, parts_a[part],
+					    tuple_b->data, parts_b[part]);
 		if (r) {
 			return r;
 		}
@@ -467,15 +466,16 @@ sparse_node_compare(
  * Compare a key for a key search data and a sparse node.
  */
 static int
-sparse_key_node_compare(
-	struct key_def *key_def, const struct key_data *key_data,
-	struct box_tuple *tuple, const union sparse_part* parts)
+sparse_key_node_compare(struct key_def *key_def,
+			const struct key_data *key_data,
+			struct box_tuple *tuple,
+			const union sparse_part* parts)
 {
 	for (int part = 0; part < key_data->part_count; ++part) {
-		int r = sparse_part_compare(
-			key_def->parts[part].type,
-			key_data->data, key_data->parts[part],
-			tuple->data, parts[part]);
+		int r = sparse_part_compare(key_def->parts[part].type,
+					    key_data->data,
+					    key_data->parts[part],
+					    tuple->data, parts[part]);
 		if (r) {
 			return r;
 		}
@@ -487,10 +487,8 @@ sparse_key_node_compare(
  * Compare a part for two dense keys.
  */
 static int
-dense_part_compare(
-	enum field_data_type type,
-	const u8 *data_a, u32 offset_a,
-	const u8 *data_b, u32 offset_b)
+dense_part_compare(enum field_data_type type, const u8 *data_a,
+		   u32 offset_a, const u8 *data_b, u32 offset_b)
 {
 	const u8 *ad = data_a + offset_a;
 	const u8 *bd = data_b + offset_b;
@@ -521,11 +519,9 @@ dense_part_compare(
  * Compare a key for two dense nodes.
  */
 static int
-dense_node_compare(
-	struct key_def *key_def,
-	u32 first_field,
-	struct box_tuple *tuple_a, u32 offset_a,
-	struct box_tuple *tuple_b, u32 offset_b)
+dense_node_compare(struct key_def *key_def, u32 first_field,
+		   struct box_tuple *tuple_a, u32 offset_a,
+		   struct box_tuple *tuple_b, u32 offset_b)
 {
 	/* find field offsets */
 	u32 off_a[key_def->part_count];
@@ -546,10 +542,11 @@ dense_node_compare(
 	/* compare key parts */
 	for (int part = 0; part < key_def->part_count; ++part) {
 		int field = key_def->parts[part].fieldno;
-		int r = dense_part_compare(
-			key_def->parts[part].type,
-			tuple_a->data, off_a[field - first_field],
-			tuple_b->data, off_b[field - first_field]);
+		int r = dense_part_compare(key_def->parts[part].type,
+					   tuple_a->data,
+					   off_a[field - first_field],
+					   tuple_b->data,
+					   off_b[field - first_field]);
 		if (r) {
 			return r;
 		}
@@ -561,10 +558,9 @@ dense_node_compare(
  * Compare a part for a key search data and a dense key.
  */
 static int
-dense_key_part_compare(
-	enum field_data_type type,
-	const u8 *data_a, union sparse_part part_a,
-	const u8 *data_b, u32 offset_b)
+dense_key_part_compare(enum field_data_type type,
+		       const u8 *data_a, union sparse_part part_a,
+		       const u8 *data_b, u32 offset_b)
 {
 	const u8 *bd = data_b + offset_b;
 	u32 bl = load_varint32((void *) &bd);
@@ -605,9 +601,9 @@ dense_key_part_compare(
  * Compare a key for a key search data and a dense node.
  */
 static int
-dense_key_node_compare(
-	struct key_def *key_def, const struct key_data *key_data,
-	u32 first_field, struct box_tuple *tuple, u32 offset)
+dense_key_node_compare(struct key_def *key_def,
+		       const struct key_data *key_data,
+		       u32 first_field, struct box_tuple *tuple, u32 offset)
 {
 	/* find field offsets */
 	u32 off[key_def->part_count];
@@ -622,10 +618,11 @@ dense_key_node_compare(
 	/* compare key parts */
 	for (int part = 0; part < key_data->part_count; ++part) {
 		int field = key_def->parts[part].fieldno;
-		int r = dense_key_part_compare(
-			key_def->parts[part].type,
-			key_data->data, key_data->parts[part],
-			tuple->data, off[field - first_field]);
+		int r = dense_key_part_compare(key_def->parts[part].type,
+					       key_data->data,
+					       key_data->parts[part],
+					       tuple->data,
+					       off[field - first_field]);
 		if (r) {
 			return r;
 		}
@@ -698,10 +695,9 @@ tree_iterator_free(struct iterator *iterator)
 @class Num32TreeIndex;
 @class FixedTreeIndex;
 
-+ (Index *) alloc: (struct key_def *) key_def
-		 : (struct space *) space
++ (Index *) alloc: (struct space *) space :(u32) n_arg
 {
-	enum tree_type type = find_tree_type(space, key_def);
+	enum tree_type type = find_tree_type(space, &space->key_defs[n_arg]);
 	switch (type) {
 	case TREE_SPARSE:
 		return [SparseTreeIndex alloc];
@@ -712,7 +708,7 @@ tree_iterator_free(struct iterator *iterator)
 	case TREE_FIXED:
 		return [FixedTreeIndex alloc];
 	}
-	panic("bad tree index type");
+	panic("tree index type not implemented");
 }
 
 - (void) free
@@ -758,7 +754,7 @@ tree_iterator_free(struct iterator *iterator)
 
 	key_data->data = key;
 	key_data->part_count = 1;
-	fold_with_key_parts(&key_def, key_data);
+	fold_with_key_parts(key_def, key_data);
 
 	void *node = sptree_index_find(&tree, key_data);
 	return [self unfold: node];
@@ -771,7 +767,7 @@ tree_iterator_free(struct iterator *iterator)
 
 	key_data->data = tuple->data;
 	key_data->part_count = tuple->cardinality;
-	fold_with_sparse_parts(&key_def, tuple, key_data->parts);
+	fold_with_sparse_parts(key_def, tuple, key_data->parts);
 
 	void *node = sptree_index_find(&tree, key_data);
 	return [self unfold: node];
@@ -787,8 +783,9 @@ tree_iterator_free(struct iterator *iterator)
 - (void) replace: (struct box_tuple *) old_tuple
 		: (struct box_tuple *) new_tuple
 {
-	if (new_tuple->cardinality < key_def.max_fieldno)
-		tnt_raise(ClientError, :ER_NO_SUCH_FIELD, key_def.max_fieldno);
+	if (new_tuple->cardinality < key_def->max_fieldno)
+		tnt_raise(ClientError, :ER_NO_SUCH_FIELD,
+			  key_def->max_fieldno);
 
 	void *node = alloca([self node_size]);
 	if (old_tuple) {
@@ -802,7 +799,7 @@ tree_iterator_free(struct iterator *iterator)
 - (struct iterator *) allocIterator
 {
 	struct tree_iterator *it
-		= salloc(sizeof(struct tree_iterator) + SIZEOF_SPARSE_PARTS(&key_def));
+		= salloc(sizeof(struct tree_iterator) + SIZEOF_SPARSE_PARTS(key_def));
 
 	if (it) {
 		memset(it, 0, sizeof(struct tree_iterator));
@@ -825,14 +822,14 @@ tree_iterator_free(struct iterator *iterator)
 	assert(iterator->next == tree_iterator_next);
 	struct tree_iterator *it = tree_iterator(iterator);
 
-	if (key_def.is_unique && part_count == key_def.part_count)
+	if (key_def->is_unique && part_count == key_def->part_count)
 		it->base.next_equal = iterator_first_equal;
 	else
 		it->base.next_equal = tree_iterator_next_equal;
 
 	it->key_data.data = key;
 	it->key_data.part_count = part_count;
-	fold_with_key_parts(&key_def, &it->key_data);
+	fold_with_key_parts(key_def, &it->key_data);
 	sptree_index_iterator_init_set(&tree, &it->iter, &it->key_data);
 }
 
@@ -875,7 +872,7 @@ tree_iterator_free(struct iterator *iterator)
 	sptree_index_init(&tree,
 			  node_size, nodes, n_tuples, estimated_tuples,
 			  [self key_node_cmp],
-			  key_def.is_unique ? [self node_cmp] : [self dup_node_cmp],
+			  key_def->is_unique ? [self node_cmp] : [self dup_node_cmp],
 			  self);
 
 	/* Done with it */
@@ -951,10 +948,9 @@ sparse_node_cmp(const void *node_a, const void *node_b, void *arg)
 	SparseTreeIndex *index = (SparseTreeIndex *) arg;
 	const struct sparse_node *node_xa = node_a;
 	const struct sparse_node *node_xb = node_b;
-	return sparse_node_compare(
-		&index->key_def,
-		node_xa->tuple, node_xa->parts,
-		node_xb->tuple, node_xb->parts);
+	return sparse_node_compare(index->key_def,
+				   node_xa->tuple, node_xa->parts,
+				   node_xb->tuple, node_xb->parts);
 }
 
 static int
@@ -975,16 +971,15 @@ sparse_key_node_cmp(const void *key, const void *node, void *arg)
 	SparseTreeIndex *index = (SparseTreeIndex *) arg;
 	const struct key_data *key_data = key;
 	const struct sparse_node *node_x = node;
-	return sparse_key_node_compare(
-		&index->key_def, key_data,
-		node_x->tuple, node_x->parts);
+	return sparse_key_node_compare(index->key_def, key_data,
+				       node_x->tuple, node_x->parts);
 }
 
 @implementation SparseTreeIndex
 
 - (size_t) node_size
 {
-	return sizeof(struct sparse_node) + SIZEOF_SPARSE_PARTS(&key_def);
+	return sizeof(struct sparse_node) + SIZEOF_SPARSE_PARTS(key_def);
 }
 
 - (tree_cmp_t) node_cmp
@@ -1006,7 +1001,7 @@ sparse_key_node_cmp(const void *key, const void *node, void *arg)
 {
 	struct sparse_node *node_x = node;
 	node_x->tuple = tuple;
-	fold_with_sparse_parts(&key_def, tuple, node_x->parts);
+	fold_with_sparse_parts(key_def, tuple, node_x->parts);
 }
 
 - (struct box_tuple *) unfold: (const void *) node
@@ -1033,10 +1028,9 @@ dense_node_cmp(const void *node_a, const void *node_b, void *arg)
 	DenseTreeIndex *index = (DenseTreeIndex *) arg;
 	const struct dense_node *node_xa = node_a;
 	const struct dense_node *node_xb = node_b;
-	return dense_node_compare(
-		&index->key_def, index->first_field,
-		node_xa->tuple, node_xa->offset,
-		node_xb->tuple, node_xb->offset);
+	return dense_node_compare(index->key_def, index->first_field,
+				  node_xa->tuple, node_xa->offset,
+				  node_xb->tuple, node_xb->offset);
 }
 
 static int
@@ -1057,9 +1051,9 @@ dense_key_node_cmp(const void *key, const void * node, void *arg)
 	DenseTreeIndex *index = (DenseTreeIndex *) arg;
 	const struct key_data *key_data = key;
 	const struct dense_node *node_x = node;
-	return dense_key_node_compare(
-		&index->key_def, key_data, index->first_field,
-		node_x->tuple, node_x->offset);
+	return dense_key_node_compare(index->key_def, key_data,
+				      index->first_field,
+				      node_x->tuple, node_x->offset);
 }
 
 @implementation DenseTreeIndex
@@ -1067,7 +1061,7 @@ dense_key_node_cmp(const void *key, const void * node, void *arg)
 - (void) enable
 {
 	[super enable];
-	first_field = find_first_field(&key_def);
+	first_field = find_first_field(key_def);
 }
 
 - (size_t) node_size
@@ -1094,7 +1088,7 @@ dense_key_node_cmp(const void *key, const void * node, void *arg)
 {
 	struct dense_node *node_x = node;
 	node_x->tuple = tuple;
-	node_x->offset = fold_with_dense_offset(&key_def, tuple);
+	node_x->offset = fold_with_dense_offset(key_def, tuple);
 }
 
 - (struct box_tuple *) unfold: (const void *) node
@@ -1168,7 +1162,7 @@ num32_key_node_cmp(const void * key, const void * node, void *arg)
 {
 	struct num32_node *node_x = (struct num32_node *) node;
 	node_x->tuple = tuple;
-	node_x->value = fold_with_num32_value(&key_def, tuple);
+	node_x->value = fold_with_num32_value(key_def, tuple);
 }
 
 - (struct box_tuple *) unfold: (const void *) node
@@ -1196,10 +1190,9 @@ fixed_node_cmp(const void *node_a, const void *node_b, void *arg)
 	FixedTreeIndex *index = (FixedTreeIndex *) arg;
 	const struct fixed_node *node_xa = node_a;
 	const struct fixed_node *node_xb = node_b;
-	return dense_node_compare(
-		&index->key_def, index->first_field,
-		node_xa->tuple, index->first_offset,
-		node_xb->tuple, index->first_offset);
+	return dense_node_compare(index->key_def, index->first_field,
+				  node_xa->tuple, index->first_offset,
+				  node_xb->tuple, index->first_offset);
 }
 
 static int
@@ -1220,9 +1213,9 @@ fixed_key_node_cmp(const void *key, const void * node, void *arg)
 	FixedTreeIndex *index = (FixedTreeIndex *) arg;
 	const struct key_data *key_data = key;
 	const struct fixed_node *node_x = node;
-	return dense_key_node_compare(
-		&index->key_def, key_data, index->first_field,
-		node_x->tuple, index->first_offset);
+	return dense_key_node_compare(index->key_def, key_data,
+				      index->first_field,
+				      node_x->tuple, index->first_offset);
 }
 
 @implementation FixedTreeIndex
@@ -1230,7 +1223,7 @@ fixed_key_node_cmp(const void *key, const void * node, void *arg)
 - (void) enable
 {
 	[super enable];
-	first_field = find_first_field(&key_def);
+	first_field = find_first_field(key_def);
 	first_offset = find_fixed_offset(space, first_field, 0);
 }
 
-- 
GitLab