diff --git a/src/box/alter.cc b/src/box/alter.cc index d7191c1bba42d77b3d6f538b72c69e2c77acce59..5749740d213950501b98ca91260a8a0f08a657b9 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -661,6 +661,12 @@ struct alter_space { * substantially. */ struct key_def *pk_def; + /** + * Min field count of a new space. It is calculated before + * the new space is created and used to update optionality + * of key_defs and key_parts. + */ + uint32_t new_min_field_count; }; static struct alter_space * @@ -671,6 +677,10 @@ alter_space_new(struct space *old_space) rlist_create(&alter->ops); alter->old_space = old_space; alter->space_def = space_def_dup_xc(alter->old_space->def); + if (old_space->format != NULL) + alter->new_min_field_count = old_space->format->min_field_count; + else + alter->new_min_field_count = 0; return alter; } @@ -1359,15 +1369,32 @@ alter_space_move_indexes(struct alter_space *alter, uint32_t begin, uint32_t end) { struct space *old_space = alter->old_space; + bool is_min_field_count_changed; + if (old_space->format != NULL) { + is_min_field_count_changed = + old_space->format->min_field_count != + alter->new_min_field_count; + } else { + is_min_field_count_changed = false; + } for (uint32_t index_id = begin; index_id < end; ++index_id) { struct index *old_index = space_index(old_space, index_id); if (old_index == NULL) continue; struct index_def *old_def = old_index->def; + struct index_def *new_def; + uint32_t min_field_count = alter->new_min_field_count; if ((old_def->opts.is_unique && !old_def->key_def->is_nullable) || old_def->type != TREE || alter->pk_def == NULL) { - (void) new MoveIndex(alter, old_def->iid); + if (is_min_field_count_changed) { + new_def = index_def_dup(old_def); + index_def_update_optionality(new_def, + min_field_count); + (void) new ModifyIndex(alter, new_def, old_def); + } else { + (void) new MoveIndex(alter, old_def->iid); + } continue; } /* @@ -1375,11 +1402,11 @@ alter_space_move_indexes(struct alter_space *alter, uint32_t begin, * the primary, since primary key parts have * changed. */ - struct index_def *new_def = - index_def_new(old_def->space_id, old_def->iid, - old_def->name, strlen(old_def->name), - old_def->type, &old_def->opts, - old_def->key_def, alter->pk_def); + new_def = index_def_new(old_def->space_id, old_def->iid, + old_def->name, strlen(old_def->name), + old_def->type, &old_def->opts, + old_def->key_def, alter->pk_def); + index_def_update_optionality(new_def, min_field_count); auto guard = make_scoped_guard([=] { index_def_delete(new_def); }); (void) new RebuildIndex(alter, new_def, old_def); guard.is_active = false; @@ -1670,16 +1697,19 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event) * First, move all unchanged indexes from the old space * to the new one. */ - alter_space_move_indexes(alter, 0, iid); /* Case 1: drop the index, if it is dropped. */ if (old_index != NULL && new_tuple == NULL) { + alter_space_move_indexes(alter, 0, iid); (void) new DropIndex(alter, old_index->def); } /* Case 2: create an index, if it is simply created. */ if (old_index == NULL && new_tuple != NULL) { + alter_space_move_indexes(alter, 0, iid); CreateIndex *create_index = new CreateIndex(alter); create_index->new_index_def = index_def_new_from_tuple(new_tuple, old_space); + index_def_update_optionality(create_index->new_index_def, + alter->new_min_field_count); } /* Case 3 and 4: check if we need to rebuild index data. */ if (old_index != NULL && new_tuple != NULL) { @@ -1687,6 +1717,43 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event) index_def = index_def_new_from_tuple(new_tuple, old_space); auto index_def_guard = make_scoped_guard([=] { index_def_delete(index_def); }); + /* + * To detect which key parts are optional, + * min_field_count is required. But + * min_field_count from the old space format can + * not be used. For example, consider the case, + * when a space has no format, has a primary index + * on the first field and has a single secondary + * index on a non-nullable second field. Min field + * count here is 2. Now alter the secondary index + * to make its part be nullable. In the + * 'old_space' min_field_count is still 2, but + * actually it is already 1. Actual + * min_field_count must be calculated using old + * unchanged indexes, NEW definition of an updated + * index and a space format, defined by a user. + */ + struct key_def **keys; + size_t bsize = old_space->index_count * sizeof(keys[0]); + keys = (struct key_def **) region_alloc_xc(&fiber()->gc, + bsize); + for (uint32_t i = 0, j = 0; i < old_space->index_count; + ++i) { + struct index_def *d = old_space->index[i]->def; + if (d->iid != index_def->iid) + keys[j++] = d->key_def; + else + keys[j++] = index_def->key_def; + } + struct space_def *def = old_space->def; + alter->new_min_field_count = + tuple_format_min_field_count(keys, + old_space->index_count, + def->fields, + def->field_count); + index_def_update_optionality(index_def, + alter->new_min_field_count); + alter_space_move_indexes(alter, 0, iid); if (index_def_cmp(index_def, old_index->def) == 0) { /* Index is not changed so just move it. */ (void) new MoveIndex(alter, old_index->def->iid); diff --git a/src/box/index_def.h b/src/box/index_def.h index 18f941364d0f56ed4a4d2552ce4ac9d545b6fef0..251506a85e8c1f7afe8d338e26e097f86f28e8cd 100644 --- a/src/box/index_def.h +++ b/src/box/index_def.h @@ -163,6 +163,19 @@ index_def_dup(const struct index_def *def); void index_def_delete(struct index_def *def); +/** + * Update 'has_optional_parts' property of key definitions. + * @param def Index def, containing key definitions to update. + * @param min_field_count Minimal field count. All parts out of + * this value are optional. + */ +static inline void +index_def_update_optionality(struct index_def *def, uint32_t min_field_count) +{ + key_def_update_optionality(def->key_def, min_field_count); + key_def_update_optionality(def->cmp_def, min_field_count); +} + /** * Add an index definition to a list, preserving the * first position of the primary key. diff --git a/src/box/key_def.cc b/src/box/key_def.cc index 06b12dfd2219b141f6c542d7c72c2515bc468171..955349cf33c653753006958f4759ba20f53aa064 100644 --- a/src/box/key_def.cc +++ b/src/box/key_def.cc @@ -276,6 +276,24 @@ key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno, key_def_set_cmp(def); } +void +key_def_update_optionality(struct key_def *def, uint32_t min_field_count) +{ + def->has_optional_parts = false; + for (uint32_t i = 0; i < def->part_count; ++i) { + struct key_part *part = &def->parts[i]; + def->has_optional_parts |= part->is_nullable && + min_field_count < part->fieldno + 1; + /* + * One optional part is enough to switch to new + * comparators. + */ + if (def->has_optional_parts) + break; + } + key_def_set_cmp(def); +} + int key_def_snprint_parts(char *buf, int size, const struct key_part_def *parts, uint32_t part_count) @@ -531,6 +549,8 @@ key_def_merge(const struct key_def *first, const struct key_def *second) new_def->part_count = new_part_count; new_def->unique_part_count = new_part_count; new_def->is_nullable = first->is_nullable || second->is_nullable; + new_def->has_optional_parts = first->has_optional_parts || + second->has_optional_parts; /* Write position in the new key def. */ uint32_t pos = 0; /* Append first key def's parts to the new index_def. */ diff --git a/src/box/key_def.h b/src/box/key_def.h index 50cc547c1e8bd5836fc355d296a803646166dd73..4726c38a22025caebfd6d644547d12c24bf5a667 100644 --- a/src/box/key_def.h +++ b/src/box/key_def.h @@ -126,6 +126,11 @@ struct key_def { uint32_t unique_part_count; /** True, if at least one part can store NULL. */ bool is_nullable; + /** + * True, if some key parts can be absent in a tuple. These + * fields assumed to be MP_NIL. + */ + bool has_optional_parts; /** Key fields mask. @sa column_mask.h for details. */ uint64_t column_mask; /** The size of the 'parts' array. */ @@ -232,6 +237,16 @@ void key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno, enum field_type type, bool is_nullable, struct coll *coll); +/** + * Update 'has_optional_parts' of @a key_def with correspondence + * to @a min_field_count. + * @param def Key definition to update. + * @param min_field_count Minimal field count. All parts out of + * this value are optional. + */ +void +key_def_update_optionality(struct key_def *def, uint32_t min_field_count); + /** * An snprint-style function to print a key definition. */ diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc index 3c6adfbc6ca1f58737739ad23ec5b9b684d25a2a..edbbfb12dc878b6dc430f2b8bbd402b2cede2b5e 100644 --- a/src/box/tuple_compare.cc +++ b/src/box/tuple_compare.cc @@ -426,15 +426,23 @@ tuple_compare_field_with_hint(const char *field_a, enum mp_type a_type, } } -template<bool is_nullable> +template<bool is_nullable, bool has_optional_parts> static inline int tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, const struct key_def *key_def) { + assert(!has_optional_parts || is_nullable); + assert(is_nullable == key_def->is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); const struct key_part *part = key_def->parts; const char *tuple_a_raw = tuple_data(tuple_a); const char *tuple_b_raw = tuple_data(tuple_b); if (key_def->part_count == 1 && part->fieldno == 0) { + /* + * First field can not be optional - empty tuples + * can not exist. + */ + assert(!has_optional_parts); mp_decode_array(&tuple_a_raw); mp_decode_array(&tuple_b_raw); if (! is_nullable) { @@ -458,8 +466,8 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, const uint32_t *field_map_a = tuple_field_map(tuple_a); const uint32_t *field_map_b = tuple_field_map(tuple_b); const struct key_part *end; - const char *field_a; - const char *field_b; + const char *field_a, *field_b; + enum mp_type a_type, b_type; int rc; if (is_nullable) end = part + key_def->unique_part_count; @@ -471,7 +479,7 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, part->fieldno); field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b, part->fieldno); - assert(field_a != NULL && field_b != NULL); + assert(has_optional_parts || field_a != NULL && field_b != NULL); if (! is_nullable) { rc = tuple_compare_field(field_a, field_b, part->type, part->coll); @@ -480,8 +488,13 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, else continue; } - enum mp_type a_type = mp_typeof(*field_a); - enum mp_type b_type = mp_typeof(*field_b); + if (has_optional_parts) { + a_type = field_a != NULL ? mp_typeof(*field_a) : MP_NIL; + b_type = field_b != NULL ? mp_typeof(*field_b) : MP_NIL; + } else { + a_type = mp_typeof(*field_a); + b_type = mp_typeof(*field_b); + } if (a_type == MP_NIL) { if (b_type != MP_NIL) return -1; @@ -515,6 +528,10 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, part->fieldno); field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b, part->fieldno); + /* + * Extended parts are primary, and they can not + * be absent or be NULLs. + */ assert(field_a != NULL && field_b != NULL); rc = tuple_compare_field(field_a, field_b, part->type, part->coll); @@ -524,18 +541,22 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, return 0; } -template<bool is_nullable> +template<bool is_nullable, bool has_optional_parts> static inline int tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key, uint32_t part_count, const struct key_def *key_def) { + assert(!has_optional_parts || is_nullable); + assert(is_nullable == key_def->is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); assert(key != NULL || part_count == 0); assert(part_count <= key_def->part_count); const struct key_part *part = key_def->parts; const struct tuple_format *format = tuple_format(tuple); const char *tuple_raw = tuple_data(tuple); const uint32_t *field_map = tuple_field_map(tuple); + enum mp_type a_type, b_type; if (likely(part_count == 1)) { const char *field; field = tuple_field_raw(format, tuple_raw, field_map, @@ -544,8 +565,11 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key, return tuple_compare_field(field, key, part->type, part->coll); } - enum mp_type a_type = mp_typeof(*field); - enum mp_type b_type = mp_typeof(*key); + if (has_optional_parts) + a_type = field != NULL ? mp_typeof(*field) : MP_NIL; + else + a_type = mp_typeof(*field); + b_type = mp_typeof(*key); if (a_type == MP_NIL) { return b_type == MP_NIL ? 0 : -1; } else if (b_type == MP_NIL) { @@ -564,15 +588,18 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key, field = tuple_field_raw(format, tuple_raw, field_map, part->fieldno); if (! is_nullable) { - int rc = tuple_compare_field(field, key, part->type, - part->coll); + rc = tuple_compare_field(field, key, part->type, + part->coll); if (rc != 0) return rc; else continue; } - enum mp_type a_type = mp_typeof(*field); - enum mp_type b_type = mp_typeof(*key); + if (has_optional_parts) + a_type = field != NULL ? mp_typeof(*field) : MP_NIL; + else + a_type = mp_typeof(*field); + b_type = mp_typeof(*key); if (a_type == MP_NIL) { if (b_type != MP_NIL) return -1; @@ -645,19 +672,46 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, return 0; } -template<bool is_nullable> +template<bool is_nullable, bool has_optional_parts> static inline int -tuple_compare_with_key_sequential(const struct tuple *tuple, - const char *key, uint32_t part_count, const struct key_def *key_def) +tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key, + uint32_t part_count, + const struct key_def *key_def) { + assert(!has_optional_parts || is_nullable); assert(key_def_is_sequential(key_def)); + assert(is_nullable == key_def->is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); const char *tuple_key = tuple_data(tuple); - uint32_t tuple_field_count = mp_decode_array(&tuple_key); - assert(tuple_field_count >= key_def->part_count); - assert(part_count <= key_def->part_count); - (void) tuple_field_count; - return key_compare_parts<is_nullable>(tuple_key, key, part_count, - key_def); + uint32_t field_count = mp_decode_array(&tuple_key); + uint32_t cmp_part_count; + if (has_optional_parts && field_count < part_count) { + cmp_part_count = field_count; + } else { + assert(field_count >= part_count); + cmp_part_count = part_count; + } + int rc = key_compare_parts<is_nullable>(tuple_key, key, cmp_part_count, + key_def); + if (!has_optional_parts || rc != 0) + return rc; + /* + * If some tuple indexed fields are absent, then check + * corresponding key fields to be equal to NULL. + */ + if (field_count < part_count) { + /* + * Key's and tuple's first field_count fields are + * equal, and their bsize too. + */ + key += tuple->bsize - mp_sizeof_array(field_count); + for (uint32_t i = field_count; i < part_count; + ++i, mp_next(&key)) { + if (mp_typeof(*key) != MP_NIL) + return -1; + } + } + return 0; } int @@ -679,19 +733,21 @@ key_compare(const char *key_a, const char *key_b, } } -template<bool is_nullable> +template <bool is_nullable, bool has_optional_parts> static int tuple_compare_sequential(const struct tuple *tuple_a, const struct tuple *tuple_b, const struct key_def *key_def) { + assert(!has_optional_parts || is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); assert(key_def_is_sequential(key_def)); assert(is_nullable == key_def->is_nullable); const char *key_a = tuple_data(tuple_a); uint32_t fc_a = mp_decode_array(&key_a); const char *key_b = tuple_data(tuple_b); uint32_t fc_b = mp_decode_array(&key_b); - if (! is_nullable) { + if (!has_optional_parts && !is_nullable) { assert(fc_a >= key_def->part_count); assert(fc_b >= key_def->part_count); return key_compare_parts<false>(key_a, key_b, @@ -702,9 +758,15 @@ tuple_compare_sequential(const struct tuple *tuple_a, const struct key_part *end = part + key_def->unique_part_count; int rc; uint32_t i = 0; - for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) { - enum mp_type a_type = mp_typeof(*key_a); - enum mp_type b_type = mp_typeof(*key_b); + for (; part < end; ++part, ++i) { + enum mp_type a_type, b_type; + if (has_optional_parts) { + a_type = i >= fc_a ? MP_NIL : mp_typeof(*key_a); + b_type = i >= fc_b ? MP_NIL : mp_typeof(*key_b); + } else { + a_type = mp_typeof(*key_a); + b_type = mp_typeof(*key_b); + } if (a_type == MP_NIL) { if (b_type != MP_NIL) return -1; @@ -718,11 +780,21 @@ tuple_compare_sequential(const struct tuple *tuple_a, if (rc != 0) return rc; } + if (!has_optional_parts || i < fc_a) + mp_next(&key_a); + if (!has_optional_parts || i < fc_b) + mp_next(&key_b); } if (! was_null_met) return 0; end = key_def->parts + key_def->part_count; for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) { + /* + * If tuples are equal by unique_part_count, then + * the rest of parts are a primary key, which can + * not be absent or be null. + */ + assert(i < fc_a && i < fc_b); rc = tuple_compare_field(key_a, key_b, part->type, part->coll); if (rc != 0) @@ -911,13 +983,21 @@ static const comparator_signature cmp_arr[] = { #undef COMPARATOR tuple_compare_t -tuple_compare_create(const struct key_def *def) { +tuple_compare_create(const struct key_def *def) +{ if (def->is_nullable) { - if (key_def_is_sequential(def)) - return tuple_compare_sequential<true>; - else - return tuple_compare_slowpath<true>; + if (key_def_is_sequential(def)) { + if (def->has_optional_parts) + return tuple_compare_sequential<true, true>; + else + return tuple_compare_sequential<true, false>; + } else if (def->has_optional_parts) { + return tuple_compare_slowpath<true, true>; + } else { + return tuple_compare_slowpath<true, false>; + } } + assert(! def->has_optional_parts); if (!key_def_has_collation(def)) { /* Precalculated comparators don't use collation */ for (uint32_t k = 0; @@ -935,9 +1015,9 @@ tuple_compare_create(const struct key_def *def) { } } if (key_def_is_sequential(def)) - return tuple_compare_sequential<false>; + return tuple_compare_sequential<false, false>; else - return tuple_compare_slowpath<false>; + return tuple_compare_slowpath<false, false>; } /* }}} tuple_compare */ @@ -1123,10 +1203,21 @@ tuple_compare_with_key_t tuple_compare_with_key_create(const struct key_def *def) { if (def->is_nullable) { - if (key_def_is_sequential(def)) - return tuple_compare_with_key_sequential<true>; - return tuple_compare_with_key_slowpath<true>; + if (key_def_is_sequential(def)) { + if (def->has_optional_parts) { + return tuple_compare_with_key_sequential<true, + true>; + } else { + return tuple_compare_with_key_sequential<true, + false>; + } + } else if (def->has_optional_parts) { + return tuple_compare_with_key_slowpath<true, true>; + } else { + return tuple_compare_with_key_slowpath<true, false>; + } } + assert(! def->has_optional_parts); if (!key_def_has_collation(def)) { /* Precalculated comparators don't use collation */ for (uint32_t k = 0; @@ -1147,8 +1238,9 @@ tuple_compare_with_key_create(const struct key_def *def) } } if (key_def_is_sequential(def)) - return tuple_compare_with_key_sequential<false>; - return tuple_compare_with_key_slowpath<false>; + return tuple_compare_with_key_sequential<false, false>; + else + return tuple_compare_with_key_slowpath<false, false>; } /* }}} tuple_compare_with_key */ diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc index ea58a536e11d3bba851d3985acd6a2829bdd3ed1..880abb6bff744cef4bc8504f6d1bea7c4948893e 100644 --- a/src/box/tuple_extract_key.cc +++ b/src/box/tuple_extract_key.cc @@ -2,29 +2,52 @@ #include "tuple.h" #include "fiber.h" +enum { MSGPACK_NULL = 0xc0 }; + +/** True, if a key con contain two or more parts in sequence. */ +static bool +key_def_contains_sequential_parts(const struct key_def *def) +{ + for (uint32_t i = 0; i < def->part_count - 1; ++i) { + if (def->parts[i].fieldno + 1 == def->parts[i + 1].fieldno) + return true; + } + return false; +} + /** * Optimized version of tuple_extract_key_raw() for sequential key defs * @copydoc tuple_extract_key_raw() */ +template <bool has_optional_parts> static char * tuple_extract_key_sequential_raw(const char *data, const char *data_end, const struct key_def *key_def, uint32_t *key_size) { + assert(!has_optional_parts || key_def->is_nullable); assert(key_def_is_sequential(key_def)); + assert(has_optional_parts == key_def->has_optional_parts); + assert(data_end != NULL); + assert(mp_sizeof_nil() == 1); const char *field_start = data; uint32_t bsize = mp_sizeof_array(key_def->part_count); - - mp_decode_array(&field_start); + uint32_t field_count = mp_decode_array(&field_start); const char *field_end = field_start; - - for (uint32_t i = 0; i < key_def->part_count; i++) - mp_next(&field_end); + uint32_t null_count; + if (!has_optional_parts || field_count > key_def->part_count) { + for (uint32_t i = 0; i < key_def->part_count; i++) + mp_next(&field_end); + null_count = 0; + } else { + assert(key_def->is_nullable); + null_count = key_def->part_count - field_count; + field_end = data_end; + bsize += null_count * mp_sizeof_nil(); + } + assert(field_end - field_start <= data_end - data); bsize += field_end - field_start; - assert(!data_end || (field_end - field_start <= data_end - data)); - (void) data_end; - char *key = (char *) region_alloc(&fiber()->gc, bsize); if (key == NULL) { diag_set(OutOfMemory, bsize, "region", @@ -33,6 +56,10 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end, } char *key_buf = mp_encode_array(key, key_def->part_count); memcpy(key_buf, field_start, field_end - field_start); + if (has_optional_parts && null_count > 0) { + key_buf += field_end - field_start; + memset(key_buf, MSGPACK_NULL, null_count); + } if (key_size != NULL) *key_size = bsize; @@ -43,54 +70,78 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end, * Optimized version of tuple_extract_key() for sequential key defs * @copydoc tuple_extract_key() */ +template <bool has_optional_parts> static inline char * tuple_extract_key_sequential(const struct tuple *tuple, const struct key_def *key_def, uint32_t *key_size) { assert(key_def_is_sequential(key_def)); + assert(!has_optional_parts || key_def->is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); const char *data = tuple_data(tuple); - return tuple_extract_key_sequential_raw(data, NULL, key_def, key_size); + const char *data_end = data + tuple->bsize; + return tuple_extract_key_sequential_raw<has_optional_parts>(data, + data_end, + key_def, + key_size); } /** * General-purpose implementation of tuple_extract_key() * @copydoc tuple_extract_key() */ -template <bool contains_sequential_parts> +template <bool contains_sequential_parts, bool has_optional_parts> static char * tuple_extract_key_slowpath(const struct tuple *tuple, const struct key_def *key_def, uint32_t *key_size) { + assert(!has_optional_parts || key_def->is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); + assert(contains_sequential_parts == + key_def_contains_sequential_parts(key_def)); + assert(mp_sizeof_nil() == 1); const char *data = tuple_data(tuple); uint32_t part_count = key_def->part_count; uint32_t bsize = mp_sizeof_array(part_count); const struct tuple_format *format = tuple_format(tuple); const uint32_t *field_map = tuple_field_map(tuple); + const char *tuple_end = data + tuple->bsize; /* Calculate the key size. */ for (uint32_t i = 0; i < part_count; ++i) { const char *field = tuple_field_raw(format, data, field_map, key_def->parts[i].fieldno); + if (has_optional_parts && field == NULL) { + bsize += mp_sizeof_nil(); + continue; + } + assert(field != NULL); const char *end = field; if (contains_sequential_parts) { /* * Skip sequential part in order to * minimize tuple_field_raw() calls. */ - for (; i < key_def->part_count - 1; i++) { + for (; i < part_count - 1; i++) { if (key_def->parts[i].fieldno + 1 != - key_def->parts[i + 1].fieldno) { + key_def->parts[i + 1].fieldno) { /* * End of sequential part. */ break; } - mp_next(&end); + if (!has_optional_parts || end < tuple_end) + mp_next(&end); + else + bsize += mp_sizeof_nil(); } } - mp_next(&end); + if (!has_optional_parts || end < tuple_end) + mp_next(&end); + else + bsize += mp_sizeof_nil(); bsize += end - field; } @@ -104,27 +155,42 @@ tuple_extract_key_slowpath(const struct tuple *tuple, const char *field = tuple_field_raw(format, data, field_map, key_def->parts[i].fieldno); + if (has_optional_parts && field == NULL) { + key_buf = mp_encode_nil(key_buf); + continue; + } const char *end = field; + uint32_t null_count = 0; if (contains_sequential_parts) { /* * Skip sequential part in order to * minimize tuple_field_raw() calls. */ - for (; i < key_def->part_count - 1; i++) { + for (; i < part_count - 1; i++) { if (key_def->parts[i].fieldno + 1 != - key_def->parts[i + 1].fieldno) { + key_def->parts[i + 1].fieldno) { /* * End of sequential part. */ break; } - mp_next(&end); + if (!has_optional_parts || end < tuple_end) + mp_next(&end); + else + ++null_count; } } - mp_next(&end); + if (!has_optional_parts || end < tuple_end) + mp_next(&end); + else + ++null_count; bsize = end - field; memcpy(key_buf, field, bsize); key_buf += bsize; + if (has_optional_parts && null_count != 0) { + memset(key_buf, MSGPACK_NULL, null_count); + key_buf += null_count * mp_sizeof_nil(); + } } if (key_size != NULL) *key_size = key_buf - key; @@ -135,11 +201,15 @@ tuple_extract_key_slowpath(const struct tuple *tuple, * General-purpose version of tuple_extract_key_raw() * @copydoc tuple_extract_key_raw() */ +template <bool has_optional_parts> static char * tuple_extract_key_slowpath_raw(const char *data, const char *data_end, const struct key_def *key_def, uint32_t *key_size) { + assert(!has_optional_parts || key_def->is_nullable); + assert(has_optional_parts == key_def->has_optional_parts); + assert(mp_sizeof_nil() == 1); /* allocate buffer with maximal possible size */ char *key = (char *) region_alloc(&fiber()->gc, data_end - data); if (key == NULL) { @@ -149,7 +219,12 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end, } char *key_buf = mp_encode_array(key, key_def->part_count); const char *field0 = data; - mp_decode_array(&field0); + uint32_t field_count = mp_decode_array(&field0); + /* + * A tuple can not be empty - at least a pk always exists. + */ + assert(field_count > 0); + (void) field_count; const char *field0_end = field0; mp_next(&field0_end); const char *field = field0; @@ -157,11 +232,14 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end, uint32_t current_fieldno = 0; for (uint32_t i = 0; i < key_def->part_count; i++) { uint32_t fieldno = key_def->parts[i].fieldno; + uint32_t null_count = 0; for (; i < key_def->part_count - 1; i++) { if (key_def->parts[i].fieldno + 1 != key_def->parts[i + 1].fieldno) break; } + uint32_t end_fieldno = key_def->parts[i].fieldno; + if (fieldno < current_fieldno) { /* Rewind. */ field = field0; @@ -169,6 +247,19 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end, current_fieldno = 0; } + /* + * First fieldno in a key columns can be out of + * tuple size for nullable indexes because of + * absense of indexed fields. Treat such fields + * as NULLs. + */ + if (has_optional_parts && fieldno >= field_count) { + /* Nullify entire columns range. */ + null_count = fieldno - end_fieldno + 1; + memset(key_buf, MSGPACK_NULL, null_count); + key_buf += null_count * mp_sizeof_nil(); + continue; + } while (current_fieldno < fieldno) { /* search first field of key in tuple raw data */ field = field_end; @@ -176,31 +267,33 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end, current_fieldno++; } - while (current_fieldno < key_def->parts[i].fieldno) { - /* search the last field in subsequence */ - mp_next(&field_end); - current_fieldno++; + /* + * If the last fieldno is out of tuple size, then + * fill rest of columns with NULLs. + */ + if (has_optional_parts && end_fieldno >= field_count) { + null_count = end_fieldno - field_count + 1; + field_end = data_end; + } else { + while (current_fieldno < end_fieldno) { + mp_next(&field_end); + current_fieldno++; + } } memcpy(key_buf, field, field_end - field); key_buf += field_end - field; - assert(key_buf - key <= data_end - data); + if (has_optional_parts && null_count != 0) { + memset(key_buf, MSGPACK_NULL, null_count); + key_buf += null_count * mp_sizeof_nil(); + } else { + assert(key_buf - key <= data_end - data); + } } if (key_size != NULL) *key_size = (uint32_t)(key_buf - key); return key; } -/** True, if a key con contain two or more parts in sequence. */ -static bool -key_def_contains_sequential_parts(struct key_def *def) -{ - for (uint32_t i = 0; i < def->part_count - 1; ++i) { - if (def->parts[i].fieldno + 1 == def->parts[i + 1].fieldno) - return true; - } - return false; -} - /** * Initialize tuple_extract_key() and tuple_extract_key_raw() */ @@ -208,16 +301,45 @@ void tuple_extract_key_set(struct key_def *key_def) { if (key_def_is_sequential(key_def)) { - key_def->tuple_extract_key = tuple_extract_key_sequential; - key_def->tuple_extract_key_raw = tuple_extract_key_sequential_raw; - } else { - if (key_def_contains_sequential_parts(key_def)) { + if (key_def->has_optional_parts) { + assert(key_def->is_nullable); key_def->tuple_extract_key = - tuple_extract_key_slowpath<true>; + tuple_extract_key_sequential<true>; + key_def->tuple_extract_key_raw = + tuple_extract_key_sequential_raw<true>; } else { key_def->tuple_extract_key = - tuple_extract_key_slowpath<false>; + tuple_extract_key_sequential<false>; + key_def->tuple_extract_key_raw = + tuple_extract_key_sequential_raw<false>; } - key_def->tuple_extract_key_raw = tuple_extract_key_slowpath_raw; + } else { + if (key_def->has_optional_parts) { + assert(key_def->is_nullable); + if (key_def_contains_sequential_parts(key_def)) { + key_def->tuple_extract_key = + tuple_extract_key_slowpath<true, true>; + } else { + key_def->tuple_extract_key = + tuple_extract_key_slowpath<false, true>; + } + } else { + if (key_def_contains_sequential_parts(key_def)) { + key_def->tuple_extract_key = + tuple_extract_key_slowpath<true, false>; + } else { + key_def->tuple_extract_key = + tuple_extract_key_slowpath<false, + false>; + } + } + } + if (key_def->has_optional_parts) { + assert(key_def->is_nullable); + key_def->tuple_extract_key_raw = + tuple_extract_key_slowpath_raw<true>; + } else { + key_def->tuple_extract_key_raw = + tuple_extract_key_slowpath_raw<false>; } } diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c index e42fc039eda462d76c99163c6b124cd624213f5a..e458f49a0d4fae40dee0ed2dde2076fc9acfee14 100644 --- a/src/box/tuple_format.c +++ b/src/box/tuple_format.c @@ -49,6 +49,9 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys, uint16_t key_count, const struct field_def *fields, uint32_t field_count) { + format->min_field_count = + tuple_format_min_field_count(keys, key_count, fields, + field_count); if (format->field_count == 0) { format->field_map_size = 0; return 0; @@ -59,8 +62,6 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys, format->fields[i].type = fields[i].type; format->fields[i].offset_slot = TUPLE_OFFSET_SLOT_NIL; format->fields[i].is_nullable = fields[i].is_nullable; - if (i + 1 > format->min_field_count && !fields[i].is_nullable) - format->min_field_count = i + 1; } /* Initialize remaining fields */ for (uint32_t i = field_count; i < format->field_count; i++) @@ -236,7 +237,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count, format->field_count = field_count; format->index_field_count = index_field_count; format->exact_field_count = 0; - format->min_field_count = index_field_count; + format->min_field_count = 0; return format; } @@ -400,6 +401,14 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map, ++field; uint32_t i = 1; uint32_t defined_field_count = MIN(field_count, format->field_count); + if (field_count < format->index_field_count) { + /* + * Nullify field map to be able to detect by 0, + * which key fields are absent in tuple_field(). + */ + memset((char *)field_map - format->field_map_size, 0, + format->field_map_size); + } for (; i < defined_field_count; ++i, ++field) { mp_type = mp_typeof(*pos); if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE, @@ -415,6 +424,28 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map, return 0; } +uint32_t +tuple_format_min_field_count(struct key_def * const *keys, uint16_t key_count, + const struct field_def *space_fields, + uint32_t space_field_count) +{ + uint32_t min_field_count = 0; + for (uint32_t i = 0; i < space_field_count; ++i) { + if (! space_fields[i].is_nullable) + min_field_count = i + 1; + } + for (uint32_t i = 0; i < key_count; ++i) { + const struct key_def *kd = keys[i]; + for (uint32_t j = 0; j < kd->part_count; ++j) { + const struct key_part *kp = &kd->parts[j]; + if (!kp->is_nullable && + kp->fieldno + 1 > min_field_count) + min_field_count = kp->fieldno + 1; + } + } + return min_field_count; +} + /** Destroy tuple format subsystem and free resourses */ void tuple_format_free() diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h index 77c8e404f326ca929af1713ceff79094984b7346..d35182df7d8b992e03f48273821271739a7b7584 100644 --- a/src/box/tuple_format.h +++ b/src/box/tuple_format.h @@ -245,6 +245,21 @@ tuple_format_meta_size(const struct tuple_format *format) return format->extra_size + format->field_map_size; } +/** + * Calculate minimal field count of tuples with specified keys and + * space format. + * @param keys Array of key definitions of indexes. + * @param key_count Length of @a keys. + * @param space_fields Array of fields from a space format. + * @param space_field_count Length of @a space_fields. + * + * @retval Minimal field count. + */ +uint32_t +tuple_format_min_field_count(struct key_def * const *keys, uint16_t key_count, + const struct field_def *space_fields, + uint32_t space_field_count); + typedef struct tuple_format box_tuple_format_t; /** \cond public */ @@ -313,7 +328,7 @@ static inline const char * tuple_field_raw(const struct tuple_format *format, const char *tuple, const uint32_t *field_map, uint32_t field_no) { - if (likely(field_no < format->field_count)) { + if (likely(field_no < format->index_field_count)) { /* Indexed field */ if (field_no == 0) { @@ -322,8 +337,12 @@ tuple_field_raw(const struct tuple_format *format, const char *tuple, } int32_t offset_slot = format->fields[field_no].offset_slot; - if (offset_slot != TUPLE_OFFSET_SLOT_NIL) - return tuple + field_map[offset_slot]; + if (offset_slot != TUPLE_OFFSET_SLOT_NIL) { + if (field_map[offset_slot] != 0) + return tuple + field_map[offset_slot]; + else + return NULL; + } } ERROR_INJECT(ERRINJ_TUPLE_FIELD, return NULL); uint32_t field_count = mp_decode_array(&tuple); diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc index e08bc5ff56df3d62728989d9eda5ab37723182b2..0f7ba91d09f9be4eca5279e35c9912fa06b0804b 100644 --- a/src/box/tuple_hash.cc +++ b/src/box/tuple_hash.cc @@ -212,6 +212,7 @@ static const hasher_signature hash_arr[] = { #undef HASHER +template <bool has_optional_parts> uint32_t tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def); @@ -254,7 +255,10 @@ tuple_hash_func_set(struct key_def *key_def) { } slowpath: - key_def->tuple_hash = tuple_hash_slowpath; + if (key_def->has_optional_parts) + key_def->tuple_hash = tuple_hash_slowpath<true>; + else + key_def->tuple_hash = tuple_hash_slowpath<false>; key_def->key_hash = key_hash_slowpath; } @@ -295,17 +299,32 @@ tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field, return size; } +static inline uint32_t +tuple_hash_null(uint32_t *ph1, uint32_t *pcarry) +{ + assert(mp_sizeof_nil() == 1); + const char null = 0xc0; + PMurHash32_Process(ph1, pcarry, &null, 1); + return mp_sizeof_nil(); +} +template <bool has_optional_parts> uint32_t tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def) { + assert(has_optional_parts == key_def->has_optional_parts); uint32_t h = HASH_SEED; uint32_t carry = 0; uint32_t total_size = 0; uint32_t prev_fieldno = key_def->parts[0].fieldno; - const char* field = tuple_field(tuple, key_def->parts[0].fieldno); - total_size += tuple_hash_field(&h, &carry, &field, - key_def->parts[0].coll); + const char *field = tuple_field(tuple, key_def->parts[0].fieldno); + const char *end = (char *)tuple + tuple_size(tuple); + if (has_optional_parts && field == NULL) { + total_size += tuple_hash_null(&h, &carry); + } else { + total_size += tuple_hash_field(&h, &carry, &field, + key_def->parts[0].coll); + } for (uint32_t part_id = 1; part_id < key_def->part_count; part_id++) { /* If parts of key_def are not sequential we need to call * tuple_field. Otherwise, tuple is hashed sequentially without @@ -314,8 +333,13 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def) if (prev_fieldno + 1 != key_def->parts[part_id].fieldno) { field = tuple_field(tuple, key_def->parts[part_id].fieldno); } - total_size += tuple_hash_field(&h, &carry, &field, - key_def->parts[part_id].coll); + if (has_optional_parts && (field == NULL || field >= end)) { + total_size += tuple_hash_null(&h, &carry); + } else { + total_size += + tuple_hash_field(&h, &carry, &field, + key_def->parts[part_id].coll); + } prev_fieldno = key_def->parts[part_id].fieldno; } diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c index 5e38a424dd5fbd75c95deb465478ab8a325fd756..84182e76262624b9da1ba768143caee494fc03f0 100644 --- a/src/box/vy_stmt.c +++ b/src/box/vy_stmt.c @@ -360,7 +360,7 @@ vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type, assert(part_count <= field_count); uint32_t nulls_count = field_count - cmp_def->part_count; uint32_t bsize = mp_sizeof_array(field_count) + - mp_sizeof_nil() * nulls_count; + mp_sizeof_nil() * nulls_count; for (uint32_t i = 0; i < part_count; ++i) { const struct key_part *part = &cmp_def->parts[i]; assert(part->fieldno < field_count); @@ -421,14 +421,25 @@ vy_stmt_new_surrogate_delete(struct tuple_format *format, const char *src_pos = src_data; uint32_t src_count = mp_decode_array(&src_pos); - uint32_t field_count = format->index_field_count; - assert(src_count >= field_count); - (void) src_count; + assert(src_count >= format->min_field_count); + uint32_t field_count; + if (src_count < format->index_field_count) { + field_count = src_count; + /* + * Nullify field map to be able to detect by 0, + * which key fields are absent in tuple_field(). + */ + memset((char *)field_map - format->field_map_size, 0, + format->field_map_size); + } else { + field_count = format->index_field_count; + } char *pos = mp_encode_array(data, field_count); for (uint32_t i = 0; i < field_count; ++i) { const struct tuple_field *field = &format->fields[i]; if (! field->is_key_part) { - /* Unindexed field - write NIL */ + /* Unindexed field - write NIL. */ + assert(i < src_count); pos = mp_encode_nil(pos); mp_next(&src_pos); continue; diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index 201c62487d41da1c47df5e7d625ccdd780e894df..a33739d65984e30c4edf1ee3697dfc5d9987f5d6 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -687,7 +687,7 @@ vy_tuple_key_contains_null(const struct tuple *tuple, const struct key_def *def) { for (uint32_t i = 0; i < def->part_count; ++i) { const char *field = tuple_field(tuple, def->parts[i].fieldno); - if (mp_typeof(*field) == MP_NIL) + if (field == NULL || mp_typeof(*field) == MP_NIL) return true; } return false; diff --git a/test/engine/null.result b/test/engine/null.result index 5c3d57ff044b7f23c8ecfa2c8f5621dd5fb0d70b..4abf850213944aaeb0928003df85ac04ec2bea43 100644 --- a/test/engine/null.result +++ b/test/engine/null.result @@ -871,3 +871,692 @@ sk:select{} s:drop() --- ... +-- +-- gh-2988: allow absense of tail nullable indexed fields. +-- +s = box.schema.space.create('test', {engine = engine}) +--- +... +pk = s:create_index('pk') +--- +... +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}}) +--- +... +-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath. +s:replace{} -- Fail +--- +- error: Tuple field count 0 is less than required by space format or defined indexes + (expected at least 1) +... +-- Compare full vs not full. +s:replace{2} +--- +- [2] +... +s:replace{1, 2} +--- +- [1, 2] +... +s:select{} +--- +- - [1, 2] + - [2] +... +sk:select{box.NULL} +--- +- - [2] +... +sk:select{2} +--- +- - [1, 2] +... +-- Compare not full vs full. +s:replace{4, 5} +--- +- [4, 5] +... +s:replace{3} +--- +- [3] +... +s:select{} +--- +- - [1, 2] + - [2] + - [3] + - [4, 5] +... +sk:select{box.NULL} +--- +- - [2] + - [3] +... +sk:select{5} +--- +- - [4, 5] +... +-- Compare extended keys. +s:replace{7} +--- +- [7] +... +s:replace{6} +--- +- [6] +... +s:select{} +--- +- - [1, 2] + - [2] + - [3] + - [4, 5] + - [6] + - [7] +... +sk:select{box.NULL} +--- +- - [2] + - [3] + - [6] + - [7] +... +sk:select{} +--- +- - [2] + - [3] + - [6] + - [7] + - [1, 2] + - [4, 5] +... +-- Test tuple extract key during dump for vinyl. +box.snapshot() +--- +- ok +... +sk:select{} +--- +- - [2] + - [3] + - [6] + - [7] + - [1, 2] + - [4, 5] +... +s:select{} +--- +- - [1, 2] + - [2] + - [3] + - [4, 5] + - [6] + - [7] +... +-- Test tuple_compare_sequential_nullable, +-- tuple_compare_with_key_sequential. +s:drop() +--- +... +s = box.schema.space.create('test', {engine = engine}) +--- +... +pk = s:create_index('pk') +--- +... +parts = {} +--- +... +parts[1] = {1, 'unsigned'} +--- +... +parts[2] = {2, 'unsigned', is_nullable = true} +--- +... +parts[3] = {3, 'unsigned', is_nullable = true} +--- +... +sk = s:create_index('sk', {parts = parts}) +--- +... +-- Compare full vs not full. +s:replace{1, 2, 3} +--- +- [1, 2, 3] +... +s:replace{3} +--- +- [3] +... +s:replace{2, 3} +--- +- [2, 3] +... +sk:select{} +--- +- - [1, 2, 3] + - [2, 3] + - [3] +... +sk:select{3, box.NULL} +--- +- - [3] +... +sk:select{3, box.NULL, box.NULL} +--- +- - [3] +... +sk:select{2} +--- +- - [2, 3] +... +sk:select{2, 3} +--- +- - [2, 3] +... +sk:select{3, 100} +--- +- [] +... +sk:select{3, box.NULL, 100} +--- +- [] +... +sk:select({3, box.NULL}, {iterator = 'GE'}) +--- +- - [3] +... +sk:select({3, box.NULL}, {iterator = 'LE'}) +--- +- - [3] + - [2, 3] + - [1, 2, 3] +... +s:select{} +--- +- - [1, 2, 3] + - [2, 3] + - [3] +... +-- Test tuple extract key for vinyl. +box.snapshot() +--- +- ok +... +sk:select{} +--- +- - [1, 2, 3] + - [2, 3] + - [3] +... +sk:select{3, box.NULL} +--- +- - [3] +... +sk:select{3, box.NULL, box.NULL} +--- +- - [3] +... +sk:select{2} +--- +- - [2, 3] +... +sk:select{2, 3} +--- +- - [2, 3] +... +sk:select{3, 100} +--- +- [] +... +sk:select{3, box.NULL, 100} +--- +- [] +... +sk:select({3, box.NULL}, {iterator = 'GE'}) +--- +- - [3] +... +sk:select({3, box.NULL}, {iterator = 'LE'}) +--- +- - [3] + - [2, 3] + - [1, 2, 3] +... +-- Test a tuple_compare_sequential() for a case, when there are +-- two equal tuples, but in one of them field count < unique field +-- count. +s:replace{1, box.NULL} +--- +- [1, null] +... +s:replace{1, box.NULL, box.NULL} +--- +- [1, null, null] +... +s:select{1} +--- +- - [1, null, null] +... +-- +-- Partially sequential keys. See tuple_extract_key.cc and +-- contains_sequential_parts template flag. +-- +s:drop() +--- +... +s = box.schema.space.create('test', {engine = engine}) +--- +... +pk = s:create_index('pk') +--- +... +parts = {} +--- +... +parts[1] = {2, 'unsigned', is_nullable = true} +--- +... +parts[2] = {3, 'unsigned', is_nullable = true} +--- +... +parts[3] = {5, 'unsigned', is_nullable = true} +--- +... +parts[4] = {6, 'unsigned', is_nullable = true} +--- +... +parts[5] = {4, 'unsigned', is_nullable = true} +--- +... +parts[6] = {7, 'unsigned', is_nullable = true} +--- +... +sk = s:create_index('sk', {parts = parts}) +--- +... +s:insert{1, 1, 1, 1, 1, 1, 1} +--- +- [1, 1, 1, 1, 1, 1, 1] +... +s:insert{8, 1, 1, 1, 1, box.NULL} +--- +- [8, 1, 1, 1, 1, null] +... +s:insert{9, 1, 1, 1, box.NULL} +--- +- [9, 1, 1, 1, null] +... +s:insert{6, 6} +--- +- [6, 6] +... +s:insert{10, 6, box.NULL} +--- +- [10, 6, null] +... +s:insert{2, 2, 2, 2, 2, 2} +--- +- [2, 2, 2, 2, 2, 2] +... +s:insert{7} +--- +- [7] +... +s:insert{5, 5, 5} +--- +- [5, 5, 5] +... +s:insert{3, 5, box.NULL, box.NULL, box.NULL} +--- +- [3, 5, null, null, null] +... +s:insert{4, 5, 5, 5, box.NULL} +--- +- [4, 5, 5, 5, null] +... +s:insert{11, 4, 4, 4} +--- +- [11, 4, 4, 4] +... +s:insert{12, 4, box.NULL, 4} +--- +- [12, 4, null, 4] +... +s:insert{13, 3, 3, 3, 3} +--- +- [13, 3, 3, 3, 3] +... +s:insert{14, box.NULL, 3, box.NULL, 3} +--- +- [14, null, 3, null, 3] +... +s:select{} +--- +- - [1, 1, 1, 1, 1, 1, 1] + - [2, 2, 2, 2, 2, 2] + - [3, 5, null, null, null] + - [4, 5, 5, 5, null] + - [5, 5, 5] + - [6, 6] + - [7] + - [8, 1, 1, 1, 1, null] + - [9, 1, 1, 1, null] + - [10, 6, null] + - [11, 4, 4, 4] + - [12, 4, null, 4] + - [13, 3, 3, 3, 3] + - [14, null, 3, null, 3] +... +sk:select{} +--- +- - [7] + - [14, null, 3, null, 3] + - [9, 1, 1, 1, null] + - [8, 1, 1, 1, 1, null] + - [1, 1, 1, 1, 1, 1, 1] + - [2, 2, 2, 2, 2, 2] + - [13, 3, 3, 3, 3] + - [12, 4, null, 4] + - [11, 4, 4, 4] + - [3, 5, null, null, null] + - [5, 5, 5] + - [4, 5, 5, 5, null] + - [6, 6] + - [10, 6, null] +... +sk:select{5, 5, box.NULL} +--- +- - [5, 5, 5] + - [4, 5, 5, 5, null] +... +sk:select{5, 5, box.NULL, 100} +--- +- [] +... +sk:select({7, box.NULL}, {iterator = 'LT'}) +--- +- - [10, 6, null] + - [6, 6] + - [4, 5, 5, 5, null] + - [5, 5, 5] + - [3, 5, null, null, null] + - [11, 4, 4, 4] + - [12, 4, null, 4] + - [13, 3, 3, 3, 3] + - [2, 2, 2, 2, 2, 2] + - [1, 1, 1, 1, 1, 1, 1] + - [8, 1, 1, 1, 1, null] + - [9, 1, 1, 1, null] + - [14, null, 3, null, 3] + - [7] +... +box.snapshot() +--- +- ok +... +sk:select{} +--- +- - [7] + - [14, null, 3, null, 3] + - [9, 1, 1, 1, null] + - [8, 1, 1, 1, 1, null] + - [1, 1, 1, 1, 1, 1, 1] + - [2, 2, 2, 2, 2, 2] + - [13, 3, 3, 3, 3] + - [12, 4, null, 4] + - [11, 4, 4, 4] + - [3, 5, null, null, null] + - [5, 5, 5] + - [4, 5, 5, 5, null] + - [6, 6] + - [10, 6, null] +... +sk:select{5, 5, box.NULL} +--- +- - [5, 5, 5] + - [4, 5, 5, 5, null] +... +sk:select{5, 5, box.NULL, 100} +--- +- [] +... +sk:select({7, box.NULL}, {iterator = 'LT'}) +--- +- - [10, 6, null] + - [6, 6] + - [4, 5, 5, 5, null] + - [5, 5, 5] + - [3, 5, null, null, null] + - [11, 4, 4, 4] + - [12, 4, null, 4] + - [13, 3, 3, 3, 3] + - [2, 2, 2, 2, 2, 2] + - [1, 1, 1, 1, 1, 1, 1] + - [8, 1, 1, 1, 1, null] + - [9, 1, 1, 1, null] + - [14, null, 3, null, 3] + - [7] +... +s:drop() +--- +... +-- +-- The main case of absent nullable fields - create an index over +-- them on not empty space (available on memtx only). +-- +s = box.schema.space.create('test', {engine = 'memtx'}) +--- +... +pk = s:create_index('pk') +--- +... +s:replace{1} +--- +- [1] +... +s:replace{2} +--- +- [2] +... +s:replace{3} +--- +- [3] +... +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}}) +--- +... +s:replace{4} +--- +- [4] +... +s:replace{5, 6} +--- +- [5, 6] +... +s:replace{7, 8} +--- +- [7, 8] +... +s:replace{9, box.NULL} +--- +- [9, null] +... +s:select{} +--- +- - [1] + - [2] + - [3] + - [4] + - [5, 6] + - [7, 8] + - [9, null] +... +sk:select{} +--- +- - [1] + - [2] + - [3] + - [4] + - [9, null] + - [5, 6] + - [7, 8] +... +sk:select{box.NULL} +--- +- - [1] + - [2] + - [3] + - [4] + - [9, null] +... +s:drop() +--- +... +-- +-- The complex case: when an index part is_nullable is set to, +-- false and it changes min_field_count, this part must become +-- optional and turn on comparators for optional fields. See the +-- big comment in alter.cc in index_def_new_from_tuple(). +-- +s = box.schema.create_space('test', {engine = 'memtx'}) +--- +... +pk = s:create_index('pk') +--- +... +sk = s:create_index('sk', {parts = {2, 'unsigned'}}) +--- +... +s:replace{1, 1} +--- +- [1, 1] +... +s:replace{2, box.NULL} +--- +- error: 'Tuple field 2 type does not match one required by operation: expected unsigned' +... +s:select{} +--- +- - [1, 1] +... +sk:alter({parts = {{2, 'unsigned', is_nullable = true}}}) +--- +... +s:replace{20, box.NULL} +--- +- [20, null] +... +sk:select{} +--- +- - [20, null] + - [1, 1] +... +s:replace{10} +--- +- [10] +... +sk:select{} +--- +- - [10] + - [20, null] + - [1, 1] +... +s:replace{40} +--- +- [40] +... +sk:select{} +--- +- - [10] + - [20, null] + - [40] + - [1, 1] +... +s:drop() +--- +... +-- +-- Check that if an index alter makes a field be optional, and +-- this field is used in another index, then this another index +-- is updated too. Case of @locker. +-- +s = box.schema.space.create('test', {engine = 'memtx'}) +--- +... +_ = s:create_index('pk') +--- +... +i1 = s:create_index('i1', {parts = {2, 'unsigned', 3, 'unsigned'}}) +--- +... +i2 = s:create_index('i2', {parts = {3, 'unsigned', 2, 'unsigned'}}) +--- +... +i1:alter{parts = {{2, 'unsigned'}, {3, 'unsigned', is_nullable = true}}} +--- +... +-- i2 alter makes i1 contain optional part. Its key_def and +-- comparators must be updated. +i2:alter{parts = {{3, 'unsigned', is_nullable = true}, {2, 'unsigned'}}} +--- +... +s:insert{1, 1} +--- +- [1, 1] +... +s:insert{100, 100} +--- +- [100, 100] +... +s:insert{50, 50} +--- +- [50, 50] +... +s:insert{25, 25, 25} +--- +- [25, 25, 25] +... +s:insert{75, 75, 75} +--- +- [75, 75, 75] +... +s:select{} +--- +- - [1, 1] + - [25, 25, 25] + - [50, 50] + - [75, 75, 75] + - [100, 100] +... +i1:select{} +--- +- - [1, 1] + - [25, 25, 25] + - [50, 50] + - [75, 75, 75] + - [100, 100] +... +i2:select{} +--- +- - [1, 1] + - [50, 50] + - [100, 100] + - [25, 25, 25] + - [75, 75, 75] +... +i2:select{box.NULL, 50} +--- +- - [50, 50] +... +i2:select{} +--- +- - [1, 1] + - [50, 50] + - [100, 100] + - [25, 25, 25] + - [75, 75, 75] +... +s:drop() +--- +... diff --git a/test/engine/null.test.lua b/test/engine/null.test.lua index 777a847f0d3abdf7cc3dc642e2db65f2315966b1..7f5a7dd33216237ce2a57ba7e29afc92b5a54b79 100644 --- a/test/engine/null.test.lua +++ b/test/engine/null.test.lua @@ -277,3 +277,187 @@ s:replace{10, box.NULL} s:replace{150, box.NULL} sk:select{} s:drop() + +-- +-- gh-2988: allow absense of tail nullable indexed fields. +-- +s = box.schema.space.create('test', {engine = engine}) +pk = s:create_index('pk') +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}}) + +-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath. + +s:replace{} -- Fail +-- Compare full vs not full. +s:replace{2} +s:replace{1, 2} +s:select{} +sk:select{box.NULL} +sk:select{2} +-- Compare not full vs full. +s:replace{4, 5} +s:replace{3} +s:select{} +sk:select{box.NULL} +sk:select{5} +-- Compare extended keys. +s:replace{7} +s:replace{6} +s:select{} +sk:select{box.NULL} +sk:select{} +-- Test tuple extract key during dump for vinyl. +box.snapshot() +sk:select{} +s:select{} + +-- Test tuple_compare_sequential_nullable, +-- tuple_compare_with_key_sequential. +s:drop() +s = box.schema.space.create('test', {engine = engine}) +pk = s:create_index('pk') +parts = {} +parts[1] = {1, 'unsigned'} +parts[2] = {2, 'unsigned', is_nullable = true} +parts[3] = {3, 'unsigned', is_nullable = true} +sk = s:create_index('sk', {parts = parts}) +-- Compare full vs not full. +s:replace{1, 2, 3} +s:replace{3} +s:replace{2, 3} +sk:select{} +sk:select{3, box.NULL} +sk:select{3, box.NULL, box.NULL} +sk:select{2} +sk:select{2, 3} +sk:select{3, 100} +sk:select{3, box.NULL, 100} +sk:select({3, box.NULL}, {iterator = 'GE'}) +sk:select({3, box.NULL}, {iterator = 'LE'}) +s:select{} +-- Test tuple extract key for vinyl. +box.snapshot() +sk:select{} +sk:select{3, box.NULL} +sk:select{3, box.NULL, box.NULL} +sk:select{2} +sk:select{2, 3} +sk:select{3, 100} +sk:select{3, box.NULL, 100} +sk:select({3, box.NULL}, {iterator = 'GE'}) +sk:select({3, box.NULL}, {iterator = 'LE'}) + +-- Test a tuple_compare_sequential() for a case, when there are +-- two equal tuples, but in one of them field count < unique field +-- count. +s:replace{1, box.NULL} +s:replace{1, box.NULL, box.NULL} +s:select{1} + +-- +-- Partially sequential keys. See tuple_extract_key.cc and +-- contains_sequential_parts template flag. +-- +s:drop() +s = box.schema.space.create('test', {engine = engine}) +pk = s:create_index('pk') +parts = {} +parts[1] = {2, 'unsigned', is_nullable = true} +parts[2] = {3, 'unsigned', is_nullable = true} +parts[3] = {5, 'unsigned', is_nullable = true} +parts[4] = {6, 'unsigned', is_nullable = true} +parts[5] = {4, 'unsigned', is_nullable = true} +parts[6] = {7, 'unsigned', is_nullable = true} +sk = s:create_index('sk', {parts = parts}) +s:insert{1, 1, 1, 1, 1, 1, 1} +s:insert{8, 1, 1, 1, 1, box.NULL} +s:insert{9, 1, 1, 1, box.NULL} +s:insert{6, 6} +s:insert{10, 6, box.NULL} +s:insert{2, 2, 2, 2, 2, 2} +s:insert{7} +s:insert{5, 5, 5} +s:insert{3, 5, box.NULL, box.NULL, box.NULL} +s:insert{4, 5, 5, 5, box.NULL} +s:insert{11, 4, 4, 4} +s:insert{12, 4, box.NULL, 4} +s:insert{13, 3, 3, 3, 3} +s:insert{14, box.NULL, 3, box.NULL, 3} +s:select{} +sk:select{} +sk:select{5, 5, box.NULL} +sk:select{5, 5, box.NULL, 100} +sk:select({7, box.NULL}, {iterator = 'LT'}) +box.snapshot() +sk:select{} +sk:select{5, 5, box.NULL} +sk:select{5, 5, box.NULL, 100} +sk:select({7, box.NULL}, {iterator = 'LT'}) + +s:drop() + +-- +-- The main case of absent nullable fields - create an index over +-- them on not empty space (available on memtx only). +-- +s = box.schema.space.create('test', {engine = 'memtx'}) +pk = s:create_index('pk') +s:replace{1} +s:replace{2} +s:replace{3} +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}}) +s:replace{4} +s:replace{5, 6} +s:replace{7, 8} +s:replace{9, box.NULL} +s:select{} +sk:select{} +sk:select{box.NULL} +s:drop() + +-- +-- The complex case: when an index part is_nullable is set to, +-- false and it changes min_field_count, this part must become +-- optional and turn on comparators for optional fields. See the +-- big comment in alter.cc in index_def_new_from_tuple(). +-- +s = box.schema.create_space('test', {engine = 'memtx'}) +pk = s:create_index('pk') +sk = s:create_index('sk', {parts = {2, 'unsigned'}}) +s:replace{1, 1} +s:replace{2, box.NULL} +s:select{} +sk:alter({parts = {{2, 'unsigned', is_nullable = true}}}) +s:replace{20, box.NULL} +sk:select{} +s:replace{10} +sk:select{} +s:replace{40} +sk:select{} +s:drop() + +-- +-- Check that if an index alter makes a field be optional, and +-- this field is used in another index, then this another index +-- is updated too. Case of @locker. +-- +s = box.schema.space.create('test', {engine = 'memtx'}) +_ = s:create_index('pk') +i1 = s:create_index('i1', {parts = {2, 'unsigned', 3, 'unsigned'}}) +i2 = s:create_index('i2', {parts = {3, 'unsigned', 2, 'unsigned'}}) + +i1:alter{parts = {{2, 'unsigned'}, {3, 'unsigned', is_nullable = true}}} +-- i2 alter makes i1 contain optional part. Its key_def and +-- comparators must be updated. +i2:alter{parts = {{3, 'unsigned', is_nullable = true}, {2, 'unsigned'}}} +s:insert{1, 1} +s:insert{100, 100} +s:insert{50, 50} +s:insert{25, 25, 25} +s:insert{75, 75, 75} +s:select{} +i1:select{} +i2:select{} +i2:select{box.NULL, 50} +i2:select{} +s:drop()