diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc index dc8b7e87f823eacad82aa961d18a4a3b6d60caa8..9936eb17364dea047ac37edc4b8e2b0677bbb89a 100644 --- a/src/box/tuple_compare.cc +++ b/src/box/tuple_compare.cc @@ -810,54 +810,67 @@ tuple_compare_with_key_slowpath(struct tuple *tuple, hint_t tuple_hint, return 0; } +/** + * Compare key parts and skip compared equally. After function call, keys + * will point to the first field that differ or to the end of key or + * part_count + 1 field in order. + * Key arguments must not be NULL, allowed to be NULL if dereferenced. + */ template<bool is_nullable> static inline int -key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, - struct key_def *key_def) +key_compare_and_skip_parts(const char **key_a, const char **key_b, + uint32_t part_count, struct key_def *key_def) { assert(is_nullable == key_def->is_nullable); - assert((key_a != NULL && key_b != NULL) || part_count == 0); + assert(key_a != NULL && key_b != NULL); + assert((*key_a != NULL && *key_b != NULL) || part_count == 0); struct key_part *part = key_def->parts; + int rc; if (likely(part_count == 1)) { + enum mp_type a_type = mp_typeof(**key_a); + enum mp_type b_type = mp_typeof(**key_b); if (! is_nullable) { - return tuple_compare_field(key_a, key_b, part->type, - part->coll); - } - enum mp_type a_type = mp_typeof(*key_a); - enum mp_type b_type = mp_typeof(*key_b); - if (a_type == MP_NIL) { - return b_type == MP_NIL ? 0 : -1; + rc = tuple_compare_field(*key_a, *key_b, part->type, + part->coll); + } else if (a_type == MP_NIL) { + rc = b_type == MP_NIL ? 0 : -1; } else if (b_type == MP_NIL) { - return 1; + rc = 1; } else { - return tuple_compare_field_with_type(key_a, a_type, - key_b, b_type, - part->type, - part->coll); + rc = tuple_compare_field_with_type(*key_a, a_type, + *key_b, b_type, + part->type, + part->coll); + } + /* If key parts are equals, we must skip them. */ + if (rc == 0) { + mp_next(key_a); + mp_next(key_b); } + return rc; } struct key_part *end = part + part_count; - int rc; - for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) { + for (; part < end; ++part, mp_next(key_a), mp_next(key_b)) { if (! is_nullable) { - rc = tuple_compare_field(key_a, key_b, part->type, + rc = tuple_compare_field(*key_a, *key_b, part->type, part->coll); if (rc != 0) return rc; else continue; } - enum mp_type a_type = mp_typeof(*key_a); - enum mp_type b_type = mp_typeof(*key_b); + enum mp_type a_type = mp_typeof(**key_a); + enum mp_type b_type = mp_typeof(**key_b); if (a_type == MP_NIL) { if (b_type != MP_NIL) return -1; } else if (b_type == MP_NIL) { return 1; } else { - rc = tuple_compare_field_with_type(key_a, a_type, key_b, - b_type, part->type, + rc = tuple_compare_field_with_type(*key_a, a_type, + *key_b, b_type, + part->type, part->coll); if (rc != 0) return rc; @@ -866,6 +879,15 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, return 0; } +template<bool is_nullable> +static inline int +key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, + struct key_def *key_def) +{ + return key_compare_and_skip_parts<is_nullable>(&key_a, &key_b, + part_count, key_def); +} + template<bool is_nullable, bool has_optional_parts> static inline int tuple_compare_with_key_sequential(struct tuple *tuple, hint_t tuple_hint, @@ -1447,17 +1469,48 @@ func_index_compare_with_key(struct tuple *tuple, hint_t tuple_hint, const char *key, uint32_t part_count, hint_t key_hint, struct key_def *key_def) { - (void)tuple; (void)key_hint; + (void)key_hint; assert(key_def->for_func_index); assert(is_nullable == key_def->is_nullable); const char *tuple_key = tuple_data((struct tuple *)tuple_hint); assert(mp_typeof(*tuple_key) == MP_ARRAY); uint32_t tuple_key_count = mp_decode_array(&tuple_key); - part_count = MIN(part_count, tuple_key_count); + uint32_t cmp_part_count = MIN(part_count, tuple_key_count); + cmp_part_count = MIN(cmp_part_count, key_def->part_count); + int rc = key_compare_and_skip_parts<is_nullable>(&tuple_key, &key, + cmp_part_count, + key_def); + if (rc != 0) + return rc; + /* Equals if nothing to compare. */ + if (part_count == cmp_part_count || + key_def->part_count == cmp_part_count) + return 0; + /* + * Now we know that tuple_key count is less than key part_count + * and key_def part_count, so let's keep comparing, but with + * original tuple fields. We will compare parts of primary key, + * it cannot contain nullable parts so the code is simplified + * correspondingly. Also, all the key parts, corresponding to + * func key, were already skipped. + */ + const char *tuple_raw = tuple_data(tuple); + struct tuple_format *format = tuple_format(tuple); + const uint32_t *field_map = tuple_field_map(tuple); + const char *field; part_count = MIN(part_count, key_def->part_count); - return key_compare_parts<is_nullable>(tuple_key, key, part_count, - key_def); + for (uint32_t i = cmp_part_count; i < part_count; i++) { + struct key_part *part = &key_def->parts[i]; + field = tuple_field_raw_by_part(format, tuple_raw, field_map, + part, MULTIKEY_NONE); + assert(field != NULL); + rc = tuple_compare_field(field, key, part->type, part->coll); + mp_next(&key); + if (rc != 0) + return rc; + } + return 0; } #undef KEY_COMPARATOR diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index d603d7cf0118758f3e69ad7ae6006ecdeea85635..1f29ee91c37c62abcd670ed5efd73a2b61181cc8 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -558,3 +558,8 @@ create_unit_test(PREFIX iterator_position SOURCES iterator_position.c box_test_utils.c LIBRARIES unit box core ) + +create_unit_test(PREFIX key_def + SOURCES key_def.c box_test_utils.c + LIBRARIES unit box core +) diff --git a/test/unit/key_def.c b/test/unit/key_def.c new file mode 100644 index 0000000000000000000000000000000000000000..0546a2b6e1dd8befb8cf442b79d5d68c18f49174 --- /dev/null +++ b/test/unit/key_def.c @@ -0,0 +1,259 @@ +#include <stdarg.h> +#include <stddef.h> +#include <string.h> + +#include "fiber.h" +#include "key_def.h" +#include "memory.h" +#include "msgpuck.h" +#include "small/region.h" +#include "trivia/util.h" +#include "tuple.h" + +#define UNIT_TAP_COMPATIBLE 1 +#include "unit.h" + +static char * +test_key_new_va(const char *format, va_list ap) +{ + struct region *region = &fiber()->gc; + size_t region_svp = region_used(region); + + /* Format the MsgPack key definition. */ + const size_t mp_buf_size = 1024; + char *mp_buf = region_alloc(region, mp_buf_size); + fail_if(mp_buf == NULL); + size_t mp_size = mp_vformat(mp_buf, mp_buf_size, format, ap); + fail_if(mp_size > mp_buf_size); + + /* Create a key. */ + char *key = xmalloc(mp_size); + memcpy(key, mp_buf, mp_size); + + region_truncate(region, region_svp); + return key; +} + +/** Creates a key from a MsgPack format (see mp_format). */ +static char * +test_key_new(const char *format, ...) +{ + va_list ap; + va_start(ap, format); + char *key = test_key_new_va(format, ap); + fail_unless(mp_typeof(*key) == MP_ARRAY); + va_end(ap); + return key; +} + +static struct tuple * +test_tuple_new_va(const char *format, va_list ap) +{ + struct region *region = &fiber()->gc; + size_t region_svp = region_used(region); + + /* Format the MsgPack key definition. */ + const size_t mp_buf_size = 1024; + char *mp_buf = region_alloc(region, mp_buf_size); + fail_if(mp_buf == NULL); + size_t mp_size = mp_vformat(mp_buf, mp_buf_size, format, ap); + fail_if(mp_size > mp_buf_size); + + /* Create a tuple. */ + struct tuple *tuple = tuple_new(tuple_format_runtime, mp_buf, + mp_buf + mp_size); + fail_if(tuple == NULL); + + region_truncate(region, region_svp); + return tuple; +} + +/** Creates a tuple from a MsgPack format (see mp_format). */ +static struct tuple * +test_tuple_new(const char *format, ...) +{ + va_list ap; + va_start(ap, format); + struct tuple *tuple = test_tuple_new_va(format, ap); + va_end(ap); + return tuple; +} + +static struct key_def * +test_key_def_new_va(const char *format, va_list ap, bool for_func_index) +{ + struct region *region = &fiber()->gc; + size_t region_svp = region_used(region); + + /* Format the MsgPack key definition. */ + const size_t mp_buf_size = 1024; + char *mp_buf = region_alloc(region, mp_buf_size); + fail_if(mp_buf == NULL); + fail_if(mp_vformat(mp_buf, mp_buf_size, format, ap) > mp_buf_size); + + /* Decode the key parts. */ + const char *parts = mp_buf; + uint32_t part_count = mp_decode_array(&parts); + struct key_part_def *part_def = region_alloc( + region, sizeof(*part_def) * part_count); + fail_if(part_def == NULL); + fail_if(key_def_decode_parts(part_def, part_count, &parts, + /*fields=*/NULL, /*field_count=*/0, + region) != 0); + + /* Create a key def. */ + struct key_def *def = key_def_new(part_def, part_count, for_func_index); + fail_if(def == NULL); + key_def_update_optionality(def, 0); + + region_truncate(region, region_svp); + return def; +} + +/** Creates a key_def from a MsgPack format (see mp_format). */ +static struct key_def * +test_key_def_new(const char *format, ...) +{ + va_list ap; + va_start(ap, format); + struct key_def *def = test_key_def_new_va(format, ap, + /*for_func_index=*/false); + va_end(ap); + return def; +} + +/** Creates a functional index key_def from a MsgPack format (see mp_format). */ +static struct key_def * +test_key_def_new_func(const char *format, ...) +{ + va_list ap; + va_start(ap, format); + struct key_def *def = test_key_def_new_va(format, ap, + /*for_func_index=*/true); + va_end(ap); + return def; +} + +/** + * Checks that tuple_compare_with_key with cmp_def of functional index + * returns the same result as comparison of concatenated func and primary keys. + */ +static void +test_check_tuple_compare_with_key_func( + struct key_def *cmp_def, struct tuple *tuple, + struct tuple *func_key, struct key_def *model_def, + struct tuple *model, const char *key) +{ + fail_unless(cmp_def->for_func_index); + fail_if(model_def->for_func_index); + const char *key_parts = key; + uint32_t part_count = mp_decode_array(&key_parts); + int a = tuple_compare_with_key(tuple, (hint_t)func_key, key_parts, + part_count, HINT_NONE, cmp_def); + int b = tuple_compare_with_key(model, HINT_NONE, key_parts, + part_count, HINT_NONE, model_def); + a = a > 0 ? 1 : a < 0 ? -1 : 0; + b = b > 0 ? 1 : b < 0 ? -1 : 0; + is(a, b, "tuple_compare_with_key_func(%s/%s, %s) = %d, expected %d", + tuple_str(tuple), tuple_str(func_key), mp_str(key), a, b); +} + +static void +test_func_compare_with_key(void) +{ + plan(14); + header(); + + struct key_def *def = test_key_def_new_func( + "[{%s%u%s%s}{%s%u%s%s}]", + "field", 0, "type", "unsigned", + "field", 1, "type", "string"); + /* Skip first field to check if func comparator can handle this. */ + struct key_def *pk_def = test_key_def_new( + "[{%s%u%s%s}{%s%u%s%s}]", + "field", 1, "type", "unsigned", + "field", 2, "type", "string"); + struct key_def *cmp_def = key_def_merge(def, pk_def); + /* + * Model def is a copy of cmp_def, but not for_func_index, and hence + * it has general implementation of tuple_compare_with_key method. + */ + struct key_def *model_def = test_key_def_new( + "[{%s%u%s%s}{%s%u%s%s}{%s%u%s%s}{%s%u%s%s}]", + "field", 0, "type", "unsigned", + "field", 1, "type", "string", + "field", 3, "type", "unsigned", + "field", 4, "type", "string"); + struct tuple *func_key = test_tuple_new("[%u%s]", 20, "foo"); + struct tuple *tuple = test_tuple_new("[%u%u%s]", 200, 10, "cpp"); + /* + * Model tuple is concatenated func_key and tuple's primary key. + * Note that the 3rd field does not take part in comparison, so it + * is intentionally different from the first field of tuple, which is + * not compared too. + */ + struct tuple *model = + test_tuple_new("[%u%s%u%u%s]", 20, "foo", 100, 10, "cpp"); + char *keys[] = { + test_key_new("[]"), + test_key_new("[%u]", 10), + test_key_new("[%u]", 20), + test_key_new("[%u]", 30), + test_key_new("[%u%s]", 10, "foo"), + test_key_new("[%u%s]", 20, "foo"), + test_key_new("[%u%s]", 20, "bar"), + test_key_new("[%u%s]", 30, "foo"), + test_key_new("[%u%s%u]", 20, "foo", 5), + test_key_new("[%u%s%u]", 20, "foo", 10), + test_key_new("[%u%s%u]", 20, "foo", 15), + test_key_new("[%u%s%u%s]", 20, "foo", 10, "bar"), + test_key_new("[%u%s%u%s]", 20, "foo", 10, "cpp"), + test_key_new("[%u%s%u%s]", 20, "foo", 10, "foo"), + }; + for (unsigned int i = 0; i < lengthof(keys); i++) { + test_check_tuple_compare_with_key_func( + cmp_def, tuple, func_key, model_def, model, keys[i]); + } + for (unsigned int i = 0; i < lengthof(keys); i++) + free(keys[i]); + tuple_delete(func_key); + key_def_delete(def); + key_def_delete(pk_def); + key_def_delete(cmp_def); + + footer(); + check_plan(); +} + +static int +test_main(void) +{ + plan(1); + header(); + + test_func_compare_with_key(); + + footer(); + return check_plan(); +} + +static uint32_t +test_field_name_hash(const char *str, uint32_t len) +{ + return str[0] + len; +} + +int +main(void) +{ + memory_init(); + fiber_init(fiber_c_invoke); + tuple_init(test_field_name_hash); + + int rc = test_main(); + + tuple_free(); + fiber_free(); + memory_free(); + return rc; +}