From 7a0f2898aa1e328b74d8d508a3b883efcf3078f1 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Sun, 5 Nov 2017 15:05:56 +0300 Subject: [PATCH] key_def: do not lookup collation when decoding parts We can't use key_def_decode_parts() when recovering vylog if key_def has a collation, because vylog is recovered before the snapshot, i.e. when collation objects haven't been created yet, while key_def_decode_parts() tries to look up the collation by id. As a result, we can't enable collations for vinyl indexes. To fix this, let's rework the decoding procedure so that it works with struct key_part_def instead of key_part. The only difference between the two structures is that the former references the collation by id while the latter by pointer. Needed for #2822 --- src/box/alter.cc | 23 ++++-- src/box/key_def.cc | 181 +++++++++++++++++++++++++-------------------- src/box/key_def.h | 48 ++++++++++-- src/box/vinyl.c | 3 +- src/box/vy_log.c | 96 ++++++++++++++---------- src/box/vy_log.h | 5 ++ 6 files changed, 221 insertions(+), 135 deletions(-) diff --git a/src/box/alter.cc b/src/box/alter.cc index 9215c07fd5..83b6bfcc73 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -268,22 +268,33 @@ index_def_new_from_tuple(struct tuple *tuple, struct space *space) tt_cstr(name, BOX_INVALID_NAME_MAX), space_name(space), "index name is too long"); } - struct key_def *key_def = key_def_new(part_count); - if (key_def == NULL) - diag_raise(); - auto key_def_guard = make_scoped_guard([=] { box_key_def_delete(key_def); }); + struct key_def *key_def = NULL; + struct key_part_def *part_def = (struct key_part_def *) + malloc(sizeof(*part_def) * part_count); + if (part_def == NULL) { + tnt_raise(OutOfMemory, sizeof(*part_def) * part_count, + "malloc", "key_part_def"); + } + auto key_def_guard = make_scoped_guard([=] { + free(part_def); + free(key_def); + }); if (is_166plus) { /* 1.6.6+ */ - if (key_def_decode_parts(key_def, &parts, space->def->fields, + if (key_def_decode_parts(part_def, part_count, &parts, + space->def->fields, space->def->field_count) != 0) diag_raise(); } else { /* 1.6.5- TODO: remove it in newer versions, find all 1.6.5- */ - if (key_def_decode_parts_160(key_def, &parts, + if (key_def_decode_parts_160(part_def, part_count, &parts, space->def->fields, space->def->field_count) != 0) diag_raise(); } + key_def = key_def_new_with_parts(part_def, part_count); + if (key_def == NULL) + diag_raise(); struct index_def *index_def = index_def_new(id, index_id, name, name_len, type, &opts, key_def, space_index_key_def(space, 0)); diff --git a/src/box/key_def.cc b/src/box/key_def.cc index c74a0ab761..d0cfdf51e4 100644 --- a/src/box/key_def.cc +++ b/src/box/key_def.cc @@ -35,21 +35,10 @@ #include "schema_def.h" #include "coll_cache.h" -struct key_part_def { - /** Tuple field index for this part. */ - uint32_t fieldno; - /** Type of the tuple field. */ - enum field_type type; - /** Collation ID for string comparison. */ - uint32_t coll_id; - /** True if a key part can store NULLs. */ - bool is_nullable; -}; - static const struct key_part_def key_part_def_default = { 0, field_type_MAX, - UINT32_MAX, + COLL_NONE, false, }; @@ -147,6 +136,45 @@ key_def_new(uint32_t part_count) return key_def; } +struct key_def * +key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count) +{ + struct key_def *def = key_def_new(part_count); + if (def == NULL) + return NULL; + + for (uint32_t i = 0; i < part_count; i++) { + struct key_part_def *part = &parts[i]; + struct coll *coll = NULL; + if (part->coll_id != COLL_NONE) { + coll = coll_cache_find(part->coll_id); + if (coll == NULL) { + diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, + i + 1, "collation was not found by ID"); + free(def); + return NULL; + } + } + key_def_set_part(def, i, part->fieldno, part->type, + part->is_nullable, coll); + } + return def; +} + +void +key_def_dump_parts(const struct key_def *def, struct key_part_def *parts) +{ + for (uint32_t i = 0; i < def->part_count; i++) { + const struct key_part *part = &def->parts[i]; + struct key_part_def *part_def = &parts[i]; + part_def->fieldno = part->fieldno; + part_def->type = part->type; + part_def->is_nullable = part->is_nullable; + part_def->coll_id = (part->coll != NULL ? + part->coll->id : COLL_NONE); + } +} + box_key_def_t * box_key_def_new(uint32_t *fields, uint32_t *types, uint32_t part_count) { @@ -243,16 +271,17 @@ key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno, } int -key_def_snprint(char *buf, int size, const struct key_def *key_def) +key_def_snprint_parts(char *buf, int size, const struct key_part_def *parts, + uint32_t part_count) { int total = 0; SNPRINT(total, snprintf, buf, size, "["); - for (uint32_t i = 0; i < key_def->part_count; i++) { - const struct key_part *part = &key_def->parts[i]; + for (uint32_t i = 0; i < part_count; i++) { + const struct key_part_def *part = &parts[i]; assert(part->type < field_type_MAX); SNPRINT(total, snprintf, buf, size, "%d, '%s'", (int)part->fieldno, field_type_strs[part->type]); - if (i < key_def->part_count - 1) + if (i < part_count - 1) SNPRINT(total, snprintf, buf, size, ", "); } SNPRINT(total, snprintf, buf, size, "]"); @@ -260,12 +289,12 @@ key_def_snprint(char *buf, int size, const struct key_def *key_def) } size_t -key_def_sizeof_parts(const struct key_def *key_def) +key_def_sizeof_parts(const struct key_part_def *parts, uint32_t part_count) { size_t size = 0; - for (uint32_t i = 0; i < key_def->part_count; i++) { - const struct key_part *part = &key_def->parts[i]; - size += mp_sizeof_map(part->coll == NULL ? 2 : 3); + for (uint32_t i = 0; i < part_count; i++) { + const struct key_part_def *part = &parts[i]; + size += mp_sizeof_map(part->coll_id == COLL_NONE ? 2 : 3); const char *opt = field_type_option_strs[PART_OPTION_FIELD]; size += mp_sizeof_str(strlen(opt)); size += mp_sizeof_uint(part->fieldno); @@ -273,21 +302,22 @@ key_def_sizeof_parts(const struct key_def *key_def) opt = field_type_option_strs[PART_OPTION_TYPE]; size += mp_sizeof_str(strlen(opt)); size += mp_sizeof_str(strlen(field_type_strs[part->type])); - if (part->coll != NULL) { + if (part->coll_id != COLL_NONE) { opt = field_type_option_strs[PART_OPTION_COLLATION]; size += mp_sizeof_str(strlen(opt)); - size += mp_sizeof_uint(part->coll->id); + size += mp_sizeof_uint(part->coll_id); } } return size; } char * -key_def_encode_parts(char *data, const struct key_def *key_def) +key_def_encode_parts(char *data, const struct key_part_def *parts, + uint32_t part_count) { - for (uint32_t i = 0; i < key_def->part_count; i++) { - const struct key_part *part = &key_def->parts[i]; - data = mp_encode_map(data, part->coll == NULL ? 2 : 3); + for (uint32_t i = 0; i < part_count; i++) { + const struct key_part_def *part = &parts[i]; + data = mp_encode_map(data, part->coll_id == COLL_NONE ? 2 : 3); const char *opt = field_type_option_strs[PART_OPTION_FIELD]; data = mp_encode_str(data, opt, strlen(opt)); data = mp_encode_uint(data, part->fieldno); @@ -296,10 +326,10 @@ key_def_encode_parts(char *data, const struct key_def *key_def) assert(part->type < field_type_MAX); const char *type_str = field_type_strs[part->type]; data = mp_encode_str(data, type_str, strlen(type_str)); - if (part->coll != NULL) { + if (part->coll_id != COLL_NONE) { opt = field_type_option_strs[PART_OPTION_COLLATION]; data = mp_encode_str(data, opt, strlen(opt)); - data = mp_encode_uint(data, part->coll->id); + data = mp_encode_uint(data, part->coll_id); } } return data; @@ -314,10 +344,12 @@ key_def_encode_parts(char *data, const struct key_def *key_def) * [NUM, STR, ..][NUM, STR, ..].., */ static int -key_def_decode_parts_166(struct key_def *key_def, const char **data, - const struct field_def *fields, uint32_t field_count) +key_def_decode_parts_166(struct key_part_def *parts, uint32_t part_count, + const char **data, const struct field_def *fields, + uint32_t field_count) { - for (uint32_t i = 0; i < key_def->part_count; i++) { + for (uint32_t i = 0; i < part_count; i++) { + struct key_part_def *part = &parts[i]; if (mp_typeof(**data) != MP_ARRAY) { diag_set(ClientError, ER_WRONG_INDEX_PARTS, "expected an array"); @@ -339,7 +371,7 @@ key_def_decode_parts_166(struct key_def *key_def, const char **data, "field id must be an integer"); return -1; } - uint32_t field_no = (uint32_t) mp_decode_uint(data); + part->fieldno = (uint32_t) mp_decode_uint(data); if (mp_typeof(**data) != MP_STR) { diag_set(ClientError, ER_WRONG_INDEX_PARTS, "field type must be a string"); @@ -349,94 +381,81 @@ key_def_decode_parts_166(struct key_def *key_def, const char **data, const char *str = mp_decode_str(data, &len); for (uint32_t j = 2; j < item_count; j++) mp_next(data); - enum field_type field_type = field_type_by_name(str, len); - if (field_type == field_type_MAX) { + part->type = field_type_by_name(str, len); + if (part->type == field_type_MAX) { diag_set(ClientError, ER_WRONG_INDEX_PARTS, "unknown field type"); return -1; } - bool is_nullable; - if (field_no < field_count) - is_nullable = fields[field_no].is_nullable; - else - is_nullable = key_part_def_default.is_nullable; - key_def_set_part(key_def, i, field_no, field_type, is_nullable, - NULL); + part->is_nullable = (part->fieldno < field_count ? + fields[part->fieldno].is_nullable : + key_part_def_default.is_nullable); + part->coll_id = COLL_NONE; } return 0; } int -key_def_decode_parts(struct key_def *key_def, const char **data, - const struct field_def *fields, uint32_t field_count) +key_def_decode_parts(struct key_part_def *parts, uint32_t part_count, + const char **data, const struct field_def *fields, + uint32_t field_count) { if (mp_typeof(**data) == MP_ARRAY) { - return key_def_decode_parts_166(key_def, data, fields, - field_count); + return key_def_decode_parts_166(parts, part_count, data, + fields, field_count); } - for (uint32_t i = 0; i < key_def->part_count; i++) { + for (uint32_t i = 0; i < part_count; i++) { + struct key_part_def *part = &parts[i]; if (mp_typeof(**data) != MP_MAP) { diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, i + TUPLE_INDEX_BASE, "index part is expected to be a map"); return -1; } - struct key_part_def part = key_part_def_default; - if (opts_decode(&part, part_def_reg, data, + *part = key_part_def_default; + if (opts_decode(part, part_def_reg, data, ER_WRONG_INDEX_OPTIONS, i + TUPLE_INDEX_BASE, NULL) != 0) return -1; - if (part.type == field_type_MAX) { + if (part->type == field_type_MAX) { diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, i + TUPLE_INDEX_BASE, "index part: unknown field type"); return -1; } - struct coll *coll = NULL; - if (part.coll_id != UINT32_MAX) { - if (part.type != FIELD_TYPE_STRING && - part.type != FIELD_TYPE_SCALAR) { - diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, - i + 1, - "collation is reasonable only for " - "string and scalar parts"); - return -1; - } - coll = coll_cache_find(part.coll_id); - if (coll == NULL) { - diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, - i + 1, - "collation was not found by ID"); - return -1; - } + if (part->coll_id != COLL_NONE && + part->type != FIELD_TYPE_STRING && + part->type != FIELD_TYPE_SCALAR) { + diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, + i + 1, + "collation is reasonable only for " + "string and scalar parts"); + return -1; } - key_def_set_part(key_def, i, part.fieldno, part.type, - part.is_nullable, coll); } return 0; } int -key_def_decode_parts_160(struct key_def *key_def, const char **data, - const struct field_def *fields, uint32_t field_count) +key_def_decode_parts_160(struct key_part_def *parts, uint32_t part_count, + const char **data, const struct field_def *fields, + uint32_t field_count) { - for (uint32_t i = 0; i < key_def->part_count; i++) { - uint32_t field_no = (uint32_t) mp_decode_uint(data); + for (uint32_t i = 0; i < part_count; i++) { + struct key_part_def *part = &parts[i]; + part->fieldno = (uint32_t) mp_decode_uint(data); uint32_t len; const char *str = mp_decode_str(data, &len); - enum field_type field_type = field_type_by_name(str, len); - if (field_type == field_type_MAX) { + part->type = field_type_by_name(str, len); + if (part->type == field_type_MAX) { diag_set(ClientError, ER_WRONG_INDEX_PARTS, "unknown field type"); return -1; } - bool is_nullable; - if (field_no < field_count) - is_nullable = fields[field_no].is_nullable; - else - is_nullable = key_part_def_default.is_nullable; - key_def_set_part(key_def, i, field_no, field_type, is_nullable, - NULL); + part->is_nullable = (part->fieldno < field_count ? + fields[part->fieldno].is_nullable : + key_part_def_default.is_nullable); + part->coll_id = COLL_NONE; } return 0; } diff --git a/src/box/key_def.h b/src/box/key_def.h index 6580a397c2..201863f5a6 100644 --- a/src/box/key_def.h +++ b/src/box/key_def.h @@ -45,6 +45,23 @@ extern "C" { /* MsgPack type names */ extern const char *mp_type_strs[]; +struct key_part_def { + /** Tuple field index for this part. */ + uint32_t fieldno; + /** Type of the tuple field. */ + enum field_type type; + /** Collation ID for string comparison. */ + uint32_t coll_id; + /** True if a key part can store NULLs. */ + bool is_nullable; +}; + +/** + * Set key_part_def.coll_id to COLL_NONE if + * the field does not have a collation. + */ +#define COLL_NONE UINT32_MAX + /** Descriptor of a single part in a multipart key. */ struct key_part { /** Tuple field index for this part */ @@ -165,6 +182,19 @@ key_def_sizeof(uint32_t part_count) struct key_def * key_def_new(uint32_t part_count); +/** + * Allocate a new key_def with the given part count + * and initialize its parts. + */ +struct key_def * +key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count); + +/** + * Dump part definitions of the given key def. + */ +void +key_def_dump_parts(const struct key_def *def, struct key_part_def *parts); + /** * Set a single key part in a key def. * @pre part_no < part_count @@ -177,21 +207,23 @@ key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno, * An snprint-style function to print a key definition. */ int -key_def_snprint(char *buf, int size, const struct key_def *key_def); +key_def_snprint_parts(char *buf, int size, const struct key_part_def *parts, + uint32_t part_count); /** * Return size of key parts array when encoded in MsgPack. * See also key_def_encode_parts(). */ size_t -key_def_sizeof_parts(const struct key_def *key_def); +key_def_sizeof_parts(const struct key_part_def *parts, uint32_t part_count); /** * Encode key parts array in MsgPack and return a pointer following * the end of encoded data. */ char * -key_def_encode_parts(char *data, const struct key_def *key_def); +key_def_encode_parts(char *data, const struct key_part_def *parts, + uint32_t part_count); /** * 1.6.6+ @@ -204,8 +236,9 @@ key_def_encode_parts(char *data, const struct key_def *key_def); * {field=NUM, type=STR, ..}{field=NUM, type=STR, ..}.., */ int -key_def_decode_parts(struct key_def *key_def, const char **data, - const struct field_def *fields, uint32_t field_count); +key_def_decode_parts(struct key_part_def *parts, uint32_t part_count, + const char **data, const struct field_def *fields, + uint32_t field_count); /** * 1.6.0-1.6.5 @@ -216,8 +249,9 @@ key_def_decode_parts(struct key_def *key_def, const char **data, * NUM, STR, NUM, STR, .., */ int -key_def_decode_parts_160(struct key_def *key_def, const char **data, - const struct field_def *fields, uint32_t field_count); +key_def_decode_parts_160(struct key_part_def *parts, uint32_t part_count, + const char **data, const struct field_def *fields, + uint32_t field_count); /** * Returns the part in index_def->parts for the specified fieldno. diff --git a/src/box/vinyl.c b/src/box/vinyl.c index ad6e6f5a8c..4159b635df 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -2610,7 +2610,8 @@ vy_join_cb(const struct vy_log_record *record, void *arg) ctx->index_id = record->index_id; if (ctx->key_def != NULL) free(ctx->key_def); - ctx->key_def = key_def_dup(record->key_def); + ctx->key_def = key_def_new_with_parts(record->key_parts, + record->key_part_count); if (ctx->key_def == NULL) return -1; if (ctx->format != NULL) diff --git a/src/box/vy_log.c b/src/box/vy_log.c index be50b357bb..381446cd1d 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -194,8 +194,10 @@ struct vy_index_recovery_info { uint32_t index_id; /** Space ID. */ uint32_t space_id; - /** Index key definition. */ - struct key_def *key_def; + /** Array of key part definitions. */ + struct key_part_def *key_parts; + /** Number of key parts. */ + uint32_t key_part_count; /** True if the index was dropped. */ bool is_dropped; /** LSN of the last index dump. */ @@ -328,10 +330,11 @@ vy_log_record_snprint(char *buf, int size, const struct vy_log_record *record) SNPRINT(total, snprintf, buf, size, "%s=%"PRIu32", ", vy_log_key_name[VY_LOG_KEY_SPACE_ID], record->space_id); - if (record->key_def != NULL) { + if (record->key_parts != NULL) { SNPRINT(total, snprintf, buf, size, "%s=", vy_log_key_name[VY_LOG_KEY_DEF]); - SNPRINT(total, key_def_snprint, buf, size, record->key_def); + SNPRINT(total, key_def_snprint_parts, buf, size, + record->key_parts, record->key_part_count); SNPRINT(total, snprintf, buf, size, ", "); } if (record->slice_id > 0) @@ -434,10 +437,11 @@ vy_log_record_encode(const struct vy_log_record *record, size += mp_sizeof_uint(record->space_id); n_keys++; } - if (record->key_def != NULL) { + if (record->key_parts != NULL) { size += mp_sizeof_uint(VY_LOG_KEY_DEF); - size += mp_sizeof_array(record->key_def->part_count); - size += key_def_sizeof_parts(record->key_def); + size += mp_sizeof_array(record->key_part_count); + size += key_def_sizeof_parts(record->key_parts, + record->key_part_count); n_keys++; } if (record->slice_id > 0) { @@ -508,10 +512,11 @@ vy_log_record_encode(const struct vy_log_record *record, pos = mp_encode_uint(pos, VY_LOG_KEY_SPACE_ID); pos = mp_encode_uint(pos, record->space_id); } - if (record->key_def != NULL) { + if (record->key_parts != NULL) { pos = mp_encode_uint(pos, VY_LOG_KEY_DEF); - pos = mp_encode_array(pos, record->key_def->part_count); - pos = key_def_encode_parts(pos, record->key_def); + pos = mp_encode_array(pos, record->key_part_count); + pos = key_def_encode_parts(pos, record->key_parts, + record->key_part_count); } if (record->slice_id > 0) { pos = mp_encode_uint(pos, VY_LOG_KEY_SLICE_ID); @@ -615,26 +620,24 @@ vy_log_record_decode(struct vy_log_record *record, break; case VY_LOG_KEY_DEF: { uint32_t part_count = mp_decode_array(&pos); - struct key_def *key_def = - region_alloc(&fiber()->gc, key_def_sizeof(part_count)); - if (key_def == NULL) { + struct key_part_def *parts = region_alloc(&fiber()->gc, + sizeof(*parts) * part_count); + if (parts == NULL) { diag_set(OutOfMemory, - key_def_sizeof(part_count), - "region", "struct key_def"); + sizeof(*parts) * part_count, + "region", "struct key_part_def"); return -1; } - memset(key_def, 0, key_def_sizeof(part_count)); - key_def->is_nullable = false; - key_def->unique_part_count = part_count; - key_def->part_count = part_count; - if (key_def_decode_parts(key_def, &pos, NULL, 0) != 0) { + if (key_def_decode_parts(parts, part_count, &pos, + NULL, 0) != 0) { diag_log(); diag_set(ClientError, ER_INVALID_VYLOG_FILE, "Bad record: failed to decode " "index key definition"); goto fail; } - record->key_def = key_def; + record->key_parts = parts; + record->key_part_count = part_count; break; } case VY_LOG_KEY_SLICE_ID: @@ -705,11 +708,17 @@ vy_log_record_dup(struct region *pool, const struct vy_log_record *src) memcpy((char *)dst->end, src->end, size); } if (src->key_def != NULL) { - size_t size = key_def_sizeof(src->key_def->part_count); - dst->key_def = region_alloc(pool, size); - if (dst->key_def == NULL) + size_t size = src->key_def->part_count * + sizeof(struct key_part_def); + dst->key_parts = region_alloc(pool, size); + if (dst->key_parts == NULL) { + diag_set(OutOfMemory, size, "region", + "struct key_part_def"); goto err; - memcpy((char*) dst->key_def, src->key_def, size); + } + key_def_dump_parts(src->key_def, dst->key_parts); + dst->key_part_count = src->key_def->part_count; + dst->key_def = NULL; } return dst; @@ -1222,8 +1231,6 @@ vy_log_write(const struct vy_log_record *record) { assert(latch_owner(&vy_log.latch) == fiber()); - say_debug("%s: %s", __func__, vy_log_record_str(record)); - size_t svp = region_used(&vy_log.pool); struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.pool, @@ -1234,6 +1241,8 @@ vy_log_write(const struct vy_log_record *record) return; } + say_debug("%s: %s", __func__, vy_log_record_str(tx_record)); + stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx); vy_log.tx_size++; if (vy_log.tx_begin == NULL) { @@ -1318,10 +1327,11 @@ vy_recovery_lookup_slice(struct vy_recovery *recovery, int64_t slice_id) static int vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn, uint32_t index_id, uint32_t space_id, - const struct key_def *key_def) + const struct key_part_def *key_parts, + uint32_t key_part_count) { struct vy_index_recovery_info *index; - struct key_def *key_def_copy; + struct key_part_def *key_parts_copy; struct mh_i64ptr_node_t node; struct mh_i64ptr_t *h; mh_int_t k; @@ -1330,15 +1340,19 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn, * Make a copy of the key definition to be used for * the new index incarnation. */ - if (key_def == NULL) { + if (key_parts == NULL) { diag_set(ClientError, ER_INVALID_VYLOG_FILE, tt_sprintf("Missing key definition for index %lld", (long long)index_lsn)); return -1; } - key_def_copy = key_def_dup(key_def); - if (key_def_copy == NULL) + key_parts_copy = malloc(sizeof(*key_parts) * key_part_count); + if (key_parts_copy == NULL) { + diag_set(OutOfMemory, sizeof(*key_parts) * key_part_count, + "malloc", "struct key_part_def"); return -1; + } + memcpy(key_parts_copy, key_parts, sizeof(*key_parts) * key_part_count); /* * Look up the index in the hash. @@ -1359,7 +1373,7 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn, if (index == NULL) { diag_set(OutOfMemory, sizeof(*index), "malloc", "struct vy_index_recovery_info"); - free(key_def_copy); + free(key_parts_copy); return -1; } index->index_id = index_id; @@ -1371,7 +1385,7 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn, if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) { diag_set(OutOfMemory, 0, "mh_i64ptr_put", "mh_i64ptr_node_t"); - free(key_def_copy); + free(key_parts_copy); free(index); return -1; } @@ -1387,16 +1401,17 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn, tt_sprintf("Index %u/%u created twice", (unsigned)space_id, (unsigned)index_id)); - free(key_def_copy); + free(key_parts_copy); return -1; } assert(index->index_id == index_id); assert(index->space_id == space_id); - free(index->key_def); + free(index->key_parts); } index->index_lsn = index_lsn; - index->key_def = key_def_copy; + index->key_parts = key_parts_copy; + index->key_part_count = key_part_count; index->is_dropped = false; index->dump_lsn = -1; index->truncate_count = 0; @@ -1920,7 +1935,7 @@ vy_recovery_process_record(struct vy_recovery *recovery, case VY_LOG_CREATE_INDEX: rc = vy_recovery_create_index(recovery, record->index_lsn, record->index_id, record->space_id, - record->key_def); + record->key_parts, record->key_part_count); break; case VY_LOG_DROP_INDEX: rc = vy_recovery_drop_index(recovery, record->index_lsn); @@ -2094,7 +2109,7 @@ vy_recovery_delete(struct vy_recovery *recovery) mh_foreach(recovery->index_id_hash, i) { struct vy_index_recovery_info *index; index = mh_i64ptr_node(recovery->index_id_hash, i)->val; - free(index->key_def); + free(index->key_parts); free(index); } mh_i64ptr_delete(recovery->index_id_hash); @@ -2136,7 +2151,8 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index, record.index_lsn = index->index_lsn; record.index_id = index->index_id; record.space_id = index->space_id; - record.key_def = index->key_def; + record.key_parts = index->key_parts; + record.key_part_count = index->key_part_count; if (vy_recovery_cb_call(cb, cb_arg, &record) != 0) return -1; diff --git a/src/box/vy_log.h b/src/box/vy_log.h index 4698099e75..2024010d31 100644 --- a/src/box/vy_log.h +++ b/src/box/vy_log.h @@ -57,6 +57,7 @@ extern "C" { struct xlog; struct vclock; struct key_def; +struct key_part_def; struct vy_recovery; @@ -195,6 +196,10 @@ struct vy_log_record { uint32_t space_id; /** Index key definition, as defined by the user. */ const struct key_def *key_def; + /** Array of key part definitions. */ + struct key_part_def *key_parts; + /** Number of key parts. */ + uint32_t key_part_count; /** Max LSN stored on disk. */ int64_t dump_lsn; /** -- GitLab