diff --git a/src/box/box_lua_space.cc b/src/box/box_lua_space.cc index 13965f259d9503f6ba6e2c550144f2aa34f842d2..c76c80e9e2fc3cc54600b37f9d0f779b5feba598 100644 --- a/src/box/box_lua_space.cc +++ b/src/box/box_lua_space.cc @@ -76,11 +76,14 @@ lbox_pushspace(struct lua_State *L, struct space *space) * Fill space.index table with * all defined indexes. */ - for (int i = 0; i < space->key_count; i++) { - lua_pushnumber(L, i); + for (int i = 0; i <= space->index_id_max; i++) { + Index *index = space_index(space, i); + if (index == NULL) + continue; + struct key_def *key_def = &index->key_def; + lua_pushnumber(L, key_def->id); lua_newtable(L); /* space.index[i] */ - struct key_def *key_def = &space->index[i]->key_def; lua_pushstring(L, "unique"); lua_pushboolean(L, key_def->is_unique); lua_settable(L, -3); diff --git a/src/box/space.cc b/src/box/space.cc index eacf37cd731b35694e9eb0049121cac16ed0126c..f0d8cc7192efa45f454d85f7c015e009a4071beb 100644 --- a/src/box/space.cc +++ b/src/box/space.cc @@ -46,26 +46,27 @@ extern "C" { static struct mh_i32ptr_t *spaces; -/** - * Secondary indexes are built in bulk after all data is - * recovered. This flag indicates that the indexes are - * already built and ready for use. - */ -static bool secondary_indexes_enabled = false; -/** - * Primary indexes are enabled only after reading the snapshot. - */ -static bool primary_indexes_enabled = false; +struct space * +space_new(struct space_def *space_def, struct key_def *key_defs, + uint32_t key_count) +{ + struct space *space = space_by_id(space_def->id); + if (space) + tnt_raise(LoggedError, ER_SPACE_EXISTS, space_def->id); + uint32_t index_id_max = 0; + for (uint32_t j = 0; j < key_count; ++j) + index_id_max = MAX(index_id_max, key_defs[j].id); -static void -space_create(struct space *space, struct space_def *def, - struct key_def *key_defs, uint32_t key_count) -{ - memset(space, 0, sizeof(struct space)); - space->def = *def; - space->key_count = key_count; + size_t sz = sizeof(struct space) + + (key_count + index_id_max + 1) * sizeof(Index *); + space = (struct space *) calloc(1, sz); + + space->index_map = (Index **)((char *) space + sizeof(*space) + + key_count * sizeof(Index *)); + space->def = *space_def; space->format = tuple_format_new(key_defs, key_count); + space->index_id_max = index_id_max; /* fill space indexes */ for (uint32_t j = 0; j < key_count; ++j) { struct key_def *key_def = &key_defs[j]; @@ -74,30 +75,13 @@ space_create(struct space *space, struct space_def *def, tnt_raise(LoggedError, ER_MEMORY_ISSUE, "class Index", "malloc"); } - space->index[j] = index; + space->index_map[key_def->id] = index; } -} - -static void -space_destroy(struct space *space) -{ - for (uint32_t j = 0 ; j < space->key_count; j++) { - Index *index = space->index[j]; - delete index; - } -} - -struct space * -space_new(struct space_def *space_def, struct key_def *key_defs, - uint32_t key_count) -{ - struct space *space = space_by_id(space_def->id); - if (space) - tnt_raise(LoggedError, ER_SPACE_EXISTS, space_def->id); - - space = (struct space *) malloc(sizeof(struct space)); - - space_create(space, space_def, key_defs, key_count); + /* + * Initialize the primary key, but do not the secondary + * keys - they are built by space_build_secondary_keys(). + */ + space->index[space->index_count++] = space->index_map[0]; const struct mh_i32ptr_node_t node = { space_id(space), space }; mh_i32ptr_put(spaces, &node, NULL, NULL); @@ -110,6 +94,28 @@ space_new(struct space_def *space_def, struct key_def *key_defs, return space; } +void +space_build_secondary_keys(struct space *space) +{ + if (space->index_id_max == 0) + return; /* no secondary keys */ + + say_info("Building secondary keys in space %d...", + space_id(space)); + + Index *pk = space->index_map[0]; + + for (uint32_t j = 1; j <= space->index_id_max; j++) { + Index *index = space->index_map[j]; + if (index) { + index_build(index, pk); + space->index[space->index_count++] = index; + } + } + + say_info("Space %d: done", space_id(space)); +} + static void space_delete(struct space *space) { @@ -119,7 +125,8 @@ space_delete(struct space *space) mh_int_t k = mh_i32ptr_get(spaces, &node, NULL); assert(k != mh_end(spaces)); mh_i32ptr_del(spaces, k, NULL); - space_destroy(space); + for (uint32_t j = 0 ; j <= space->index_id_max; j++) + delete space->index_map[j]; free(space); } @@ -134,22 +141,6 @@ space_by_id(uint32_t id) return (struct space *) mh_i32ptr_node(spaces, space)->val; } -/** Return the number of active indexes in a space. */ -static inline int -index_count(struct space *sp) -{ - if (!secondary_indexes_enabled) { - /* If secondary indexes are not enabled yet, - we can use only the primary index. So return - 1 if there is at least one index (which - must be primary) and return 0 otherwise. */ - return sp->key_count > 0; - } else { - /* Return the actual number of indexes. */ - return sp->key_count; - } -} - /** * Visit all enabled spaces and apply 'func'. */ @@ -165,13 +156,13 @@ space_foreach(void (*func)(struct space *sp, void *udata), void *udata) { } struct tuple * -space_replace(struct space *sp, struct tuple *old_tuple, +space_replace(struct space *space, struct tuple *old_tuple, struct tuple *new_tuple, enum dup_replace_mode mode) { uint32_t i = 0; try { /* Update the primary key */ - Index *pk = sp->index[0]; + Index *pk = space->index[0]; assert(pk->key_def.is_unique); /* * If old_tuple is not NULL, the index @@ -181,17 +172,20 @@ space_replace(struct space *sp, struct tuple *old_tuple, old_tuple = pk->replace(old_tuple, new_tuple, mode); assert(old_tuple || new_tuple); - uint32_t n = index_count(sp); - /* Update secondary keys */ - for (i = i + 1; i < n; i++) { - Index *index = sp->index[i]; + /* + * Update secondary keys. When loading data from + * the WAL secondary keys are not enabled + * (index_count is 1). + */ + for (i++; i < space->index_count; i++) { + Index *index = space->index[i]; index->replace(old_tuple, new_tuple, DUP_INSERT); } return old_tuple; - } catch (const Exception& e) { + } catch (const Exception &e) { /* Rollback all changes */ for (; i > 0; i--) { - Index *index = sp->index[i-1]; + Index *index = space->index[i-1]; index->replace(new_tuple, old_tuple, DUP_INSERT); } throw; @@ -312,8 +306,6 @@ space_init(void) void begin_build_primary_indexes(void) { - assert(primary_indexes_enabled == false); - mh_int_t i; mh_foreach(spaces, i) { @@ -334,36 +326,18 @@ end_build_primary_indexes(void) Index *index = space->index[0]; index->endBuild(); } - primary_indexes_enabled = true; } void build_secondary_indexes(void) { - assert(primary_indexes_enabled == true); - assert(secondary_indexes_enabled == false); - mh_int_t i; mh_foreach(spaces, i) { struct space *space = (struct space *) mh_i32ptr_node(spaces, i)->val; - if (space->key_count <= 1) - continue; /* no secondary keys */ - - say_info("Building secondary keys in space %d...", - space_id(space)); - - Index *pk = space->index[0]; - - for (uint32_t j = 1; j < space->key_count; j++) - index_build(space->index[j], pk); - - say_info("Space %d: done", space_id(space)); + space_build_secondary_keys(space); } - - /* enable secondary indexes now */ - secondary_indexes_enabled = true; } int diff --git a/src/box/space.h b/src/box/space.h index c695877a24fb6262b64aee9eea0c704197b79ba8..d3b6c99b7439165c0ac3a342aad86cfa8d096f75 100644 --- a/src/box/space.h +++ b/src/box/space.h @@ -37,24 +37,42 @@ struct tarantool_cfg; struct space { - Index *index[BOX_INDEX_MAX]; /** - * The number of indexes in the space. + * The number of *enabled* indexes in the space. * - * It is equal to the number of non-nil members of the index - * array and defines the key_defs array size as well. + * After all indexes are built, it is equal to the number + * of non-nil members of the index[] array. */ - uint32_t key_count; + uint32_t index_count; + /** + * There may be gaps index ids, i.e. index 0 and 2 may exist, + * while index 1 is not defined. This member stores the + * max id of a defined index in the space. It defines the + * size of index_map array. + */ + uint32_t index_id_max; /** Space meta. */ struct space_def def; /** Default tuple format used by this space */ struct tuple_format *format; + /** + * Sparse array of indexes defined on the space, indexed + * by id. Used to quickly find index by id (for SELECTs). + */ + Index **index_map; + /** + * Dense array of indexes defined on the space, in order + * of index id. Initially stores only the primary key at + * position 0, and is fully built by + * space_build_secondary_keys(). + */ + Index *index[]; }; - /** Get space ordinal number. */ -static inline uint32_t space_id(struct space *space) { return space->def.id; } +static inline uint32_t +space_id(struct space *space) { return space->def.id; } /** * @brief A single method to handle REPLACE, DELETE and UPDATE. @@ -153,14 +171,14 @@ void space_validate_tuple(struct space *sp, struct tuple *new_tuple); /** - * Get index by index number. + * Get index by index id. * @return NULL if index not found. */ static inline Index * -space_index(struct space *sp, uint32_t index_no) +space_index(struct space *space, uint32_t id) { - if (index_no < BOX_INDEX_MAX) - return sp->index[index_no]; + if (id <= space->index_id_max) + return space->index_map[id]; return NULL; } @@ -191,6 +209,13 @@ struct space * space_new(struct space_def *space_def, struct key_def *key_defs, uint32_t key_count); +/** + * Secondary indexes are built in bulk after all data is + * recovered. This flag indicates that the indexes are + * already built and ready for use. + */ +void +space_build_secondary_keys(struct space *space); void space_init(void); void space_free(void);