From 1428c8d727d7a09dedb0f80db52b0d117fa10713 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Tue, 23 Jul 2013 13:46:38 +0400
Subject: [PATCH] Allow gaps in index identifiers. Remove
 primary_indexes_enabled, secondary_indexes_enabled.

---
 src/box/box_lua_space.cc |   9 ++-
 src/box/space.cc         | 146 ++++++++++++++++-----------------------
 src/box/space.h          |  47 ++++++++++---
 3 files changed, 102 insertions(+), 100 deletions(-)

diff --git a/src/box/box_lua_space.cc b/src/box/box_lua_space.cc
index 13965f259d..c76c80e9e2 100644
--- a/src/box/box_lua_space.cc
+++ b/src/box/box_lua_space.cc
@@ -76,11 +76,14 @@ lbox_pushspace(struct lua_State *L, struct space *space)
 	 * Fill space.index table with
 	 * all defined indexes.
 	 */
-	for (int i = 0; i < space->key_count; i++) {
-		lua_pushnumber(L, i);
+	for (int i = 0; i <= space->index_id_max; i++) {
+		Index *index = space_index(space, i);
+		if (index == NULL)
+			continue;
+		struct key_def *key_def = &index->key_def;
+		lua_pushnumber(L, key_def->id);
 		lua_newtable(L);		/* space.index[i] */
 
-		struct key_def *key_def = &space->index[i]->key_def;
 		lua_pushstring(L, "unique");
 		lua_pushboolean(L, key_def->is_unique);
 		lua_settable(L, -3);
diff --git a/src/box/space.cc b/src/box/space.cc
index eacf37cd73..f0d8cc7192 100644
--- a/src/box/space.cc
+++ b/src/box/space.cc
@@ -46,26 +46,27 @@ extern "C" {
 
 static struct mh_i32ptr_t *spaces;
 
-/**
- * Secondary indexes are built in bulk after all data is
- * recovered. This flag indicates that the indexes are
- * already built and ready for use.
- */
-static bool secondary_indexes_enabled = false;
-/**
- * Primary indexes are enabled only after reading the snapshot.
- */
-static bool primary_indexes_enabled = false;
+struct space *
+space_new(struct space_def *space_def, struct key_def *key_defs,
+	  uint32_t key_count)
+{
+	struct space *space = space_by_id(space_def->id);
+	if (space)
+		tnt_raise(LoggedError, ER_SPACE_EXISTS, space_def->id);
 
+	uint32_t index_id_max = 0;
+	for (uint32_t j = 0; j < key_count; ++j)
+		index_id_max = MAX(index_id_max, key_defs[j].id);
 
-static void
-space_create(struct space *space, struct space_def *def,
-	     struct key_def *key_defs, uint32_t key_count)
-{
-	memset(space, 0, sizeof(struct space));
-	space->def = *def;
-	space->key_count = key_count;
+	size_t sz = sizeof(struct space) +
+		(key_count + index_id_max + 1) * sizeof(Index *);
+	space = (struct space *) calloc(1, sz);
+
+	space->index_map = (Index **)((char *) space + sizeof(*space) +
+				      key_count * sizeof(Index *));
+	space->def = *space_def;
 	space->format = tuple_format_new(key_defs, key_count);
+	space->index_id_max = index_id_max;
 	/* fill space indexes */
 	for (uint32_t j = 0; j < key_count; ++j) {
 		struct key_def *key_def = &key_defs[j];
@@ -74,30 +75,13 @@ space_create(struct space *space, struct space_def *def,
 			tnt_raise(LoggedError, ER_MEMORY_ISSUE,
 				  "class Index", "malloc");
 		}
-		space->index[j] = index;
+		space->index_map[key_def->id] = index;
 	}
-}
-
-static void
-space_destroy(struct space *space)
-{
-	for (uint32_t j = 0 ; j < space->key_count; j++) {
-		Index *index = space->index[j];
-		delete index;
-	}
-}
-
-struct space *
-space_new(struct space_def *space_def, struct key_def *key_defs,
-	  uint32_t key_count)
-{
-	struct space *space = space_by_id(space_def->id);
-	if (space)
-		tnt_raise(LoggedError, ER_SPACE_EXISTS, space_def->id);
-
-	space = (struct space *) malloc(sizeof(struct space));
-
-	space_create(space, space_def, key_defs, key_count);
+	/*
+	 * Initialize the primary key, but do not the secondary
+	 * keys - they are built by space_build_secondary_keys().
+	 */
+	space->index[space->index_count++] = space->index_map[0];
 
 	const struct mh_i32ptr_node_t node = { space_id(space), space };
 	mh_i32ptr_put(spaces, &node, NULL, NULL);
@@ -110,6 +94,28 @@ space_new(struct space_def *space_def, struct key_def *key_defs,
 	return space;
 }
 
+void
+space_build_secondary_keys(struct space *space)
+{
+	if (space->index_id_max == 0)
+		return; /* no secondary keys */
+
+	say_info("Building secondary keys in space %d...",
+		 space_id(space));
+
+	Index *pk = space->index_map[0];
+
+	for (uint32_t j = 1; j <= space->index_id_max; j++) {
+		Index *index = space->index_map[j];
+		if (index) {
+			index_build(index, pk);
+			space->index[space->index_count++] = index;
+		}
+	}
+
+	say_info("Space %d: done", space_id(space));
+}
+
 static void
 space_delete(struct space *space)
 {
@@ -119,7 +125,8 @@ space_delete(struct space *space)
 	mh_int_t k = mh_i32ptr_get(spaces, &node, NULL);
 	assert(k != mh_end(spaces));
 	mh_i32ptr_del(spaces, k, NULL);
-	space_destroy(space);
+	for (uint32_t j = 0 ; j <= space->index_id_max; j++)
+		delete space->index_map[j];
 	free(space);
 }
 
@@ -134,22 +141,6 @@ space_by_id(uint32_t id)
 	return (struct space *) mh_i32ptr_node(spaces, space)->val;
 }
 
-/** Return the number of active indexes in a space. */
-static inline int
-index_count(struct space *sp)
-{
-	if (!secondary_indexes_enabled) {
-		/* If secondary indexes are not enabled yet,
-		   we can use only the primary index. So return
-		   1 if there is at least one index (which
-		   must be primary) and return 0 otherwise. */
-		return sp->key_count > 0;
-	} else {
-		/* Return the actual number of indexes. */
-		return sp->key_count;
-	}
-}
-
 /**
  * Visit all enabled spaces and apply 'func'.
  */
@@ -165,13 +156,13 @@ space_foreach(void (*func)(struct space *sp, void *udata), void *udata) {
 }
 
 struct tuple *
-space_replace(struct space *sp, struct tuple *old_tuple,
+space_replace(struct space *space, struct tuple *old_tuple,
 	      struct tuple *new_tuple, enum dup_replace_mode mode)
 {
 	uint32_t i = 0;
 	try {
 		/* Update the primary key */
-		Index *pk = sp->index[0];
+		Index *pk = space->index[0];
 		assert(pk->key_def.is_unique);
 		/*
 		 * If old_tuple is not NULL, the index
@@ -181,17 +172,20 @@ space_replace(struct space *sp, struct tuple *old_tuple,
 		old_tuple = pk->replace(old_tuple, new_tuple, mode);
 
 		assert(old_tuple || new_tuple);
-		uint32_t n = index_count(sp);
-		/* Update secondary keys */
-		for (i = i + 1; i < n; i++) {
-			Index *index = sp->index[i];
+		/*
+		 * Update secondary keys. When loading data from
+		 * the WAL secondary keys are not enabled
+		 * (index_count is 1).
+		 */
+		for (i++; i < space->index_count; i++) {
+			Index *index = space->index[i];
 			index->replace(old_tuple, new_tuple, DUP_INSERT);
 		}
 		return old_tuple;
-	} catch (const Exception& e) {
+	} catch (const Exception &e) {
 		/* Rollback all changes */
 		for (; i > 0; i--) {
-			Index *index = sp->index[i-1];
+			Index *index = space->index[i-1];
 			index->replace(new_tuple, old_tuple, DUP_INSERT);
 		}
 		throw;
@@ -312,8 +306,6 @@ space_init(void)
 void
 begin_build_primary_indexes(void)
 {
-	assert(primary_indexes_enabled == false);
-
 	mh_int_t i;
 
 	mh_foreach(spaces, i) {
@@ -334,36 +326,18 @@ end_build_primary_indexes(void)
 		Index *index = space->index[0];
 		index->endBuild();
 	}
-	primary_indexes_enabled = true;
 }
 
 void
 build_secondary_indexes(void)
 {
-	assert(primary_indexes_enabled == true);
-	assert(secondary_indexes_enabled == false);
-
 	mh_int_t i;
 	mh_foreach(spaces, i) {
 		struct space *space = (struct space *)
 				mh_i32ptr_node(spaces, i)->val;
 
-		if (space->key_count <= 1)
-			continue; /* no secondary keys */
-
-		say_info("Building secondary keys in space %d...",
-			 space_id(space));
-
-		Index *pk = space->index[0];
-
-		for (uint32_t j = 1; j < space->key_count; j++)
-			index_build(space->index[j], pk);
-
-		say_info("Space %d: done", space_id(space));
+		space_build_secondary_keys(space);
 	}
-
-	/* enable secondary indexes now */
-	secondary_indexes_enabled = true;
 }
 
 int
diff --git a/src/box/space.h b/src/box/space.h
index c695877a24..d3b6c99b74 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -37,24 +37,42 @@
 struct tarantool_cfg;
 
 struct space {
-	Index *index[BOX_INDEX_MAX];
 	/**
-	 * The number of indexes in the space.
+	 * The number of *enabled* indexes in the space.
 	 *
-	 * It is equal to the number of non-nil members of the index
-	 * array and defines the key_defs array size as well.
+	 * After all indexes are built, it is equal to the number
+	 * of non-nil members of the index[] array.
 	 */
-	uint32_t key_count;
+	uint32_t index_count;
+	/**
+	 * There may be gaps index ids, i.e. index 0 and 2 may exist,
+	 * while index 1 is not defined. This member stores the
+	 * max id of a defined index in the space. It defines the
+	 * size of index_map array.
+	 */
+	uint32_t index_id_max;
 	/** Space meta. */
 	struct space_def def;
 
 	/** Default tuple format used by this space */
 	struct tuple_format *format;
+	/**
+	 * Sparse array of indexes defined on the space, indexed
+	 * by id. Used to quickly find index by id (for SELECTs).
+	 */
+	Index **index_map;
+	/**
+	 * Dense array of indexes defined on the space, in order
+	 * of index id. Initially stores only the primary key at
+	 * position 0, and is fully built by
+	 * space_build_secondary_keys().
+	 */
+	Index *index[];
 };
 
-
 /** Get space ordinal number. */
-static inline uint32_t space_id(struct space *space) { return space->def.id; }
+static inline uint32_t
+space_id(struct space *space) { return space->def.id; }
 
 /**
  * @brief A single method to handle REPLACE, DELETE and UPDATE.
@@ -153,14 +171,14 @@ void
 space_validate_tuple(struct space *sp, struct tuple *new_tuple);
 
 /**
- * Get index by index number.
+ * Get index by index id.
  * @return NULL if index not found.
  */
 static inline Index *
-space_index(struct space *sp, uint32_t index_no)
+space_index(struct space *space, uint32_t id)
 {
-	if (index_no < BOX_INDEX_MAX)
-		return sp->index[index_no];
+	if (id <= space->index_id_max)
+		return space->index_map[id];
 	return NULL;
 }
 
@@ -191,6 +209,13 @@ struct space *
 space_new(struct space_def *space_def,
 	  struct key_def *key_defs, uint32_t key_count);
 
+/**
+ * Secondary indexes are built in bulk after all data is
+ * recovered. This flag indicates that the indexes are
+ * already built and ready for use.
+ */
+void
+space_build_secondary_keys(struct space *space);
 
 void space_init(void);
 void space_free(void);
-- 
GitLab