diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc index dcfb4f63648c539a1008dbd80de3755dbfb6969c..3c6adfbc6ca1f58737739ad23ec5b9b684d25a2a 100644 --- a/src/box/tuple_compare.cc +++ b/src/box/tuple_compare.cc @@ -589,18 +589,12 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key, return 0; } -/** - * is_sequential_nullable_tuples used to determine if this - * function is used to compare two tuples, which keys are - * sequential. In such a case it is necessary to compare them - * using primary parts if a NULL was met. - */ -template<bool is_nullable, bool is_sequential_nullable_tuples> +template<bool is_nullable> static inline int key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, const struct key_def *key_def) { - assert(!is_sequential_nullable_tuples || is_nullable); + assert(is_nullable == key_def->is_nullable); assert((key_a != NULL && key_b != NULL) || part_count == 0); const struct key_part *part = key_def->parts; if (likely(part_count == 1)) { @@ -622,7 +616,6 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, } } - bool was_null_met = false; const struct key_part *end = part + part_count; int rc; for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) { @@ -639,7 +632,6 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, if (a_type == MP_NIL) { if (b_type != MP_NIL) return -1; - was_null_met = true; } else if (b_type == MP_NIL) { return 1; } else { @@ -650,15 +642,6 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, return rc; } } - - if (!is_sequential_nullable_tuples || !was_null_met) - return 0; - end = key_def->parts + key_def->unique_part_count; - for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) { - rc = tuple_compare_field(key_a, key_b, part->type, part->coll); - if (rc != 0) - return rc; - } return 0; } @@ -673,8 +656,8 @@ tuple_compare_with_key_sequential(const struct tuple *tuple, assert(tuple_field_count >= key_def->part_count); assert(part_count <= key_def->part_count); (void) tuple_field_count; - return key_compare_parts<is_nullable, false>(tuple_key, key, part_count, - key_def); + return key_compare_parts<is_nullable>(tuple_key, key, part_count, + key_def); } int @@ -688,49 +671,64 @@ key_compare(const char *key_a, const char *key_b, uint32_t part_count = MIN(part_count_a, part_count_b); assert(part_count <= key_def->part_count); if (! key_def->is_nullable) { - return key_compare_parts<false, false>(key_a, key_b, part_count, - key_def); + return key_compare_parts<false>(key_a, key_b, part_count, + key_def); } else { - return key_compare_parts<true, false>(key_a, key_b, part_count, - key_def); + return key_compare_parts<true>(key_a, key_b, part_count, + key_def); } } +template<bool is_nullable> static int tuple_compare_sequential(const struct tuple *tuple_a, const struct tuple *tuple_b, const struct key_def *key_def) { assert(key_def_is_sequential(key_def)); + assert(is_nullable == key_def->is_nullable); const char *key_a = tuple_data(tuple_a); - uint32_t field_count_a = mp_decode_array(&key_a); - assert(field_count_a >= key_def->part_count); - (void) field_count_a; + uint32_t fc_a = mp_decode_array(&key_a); const char *key_b = tuple_data(tuple_b); - uint32_t field_count_b = mp_decode_array(&key_b); - assert(field_count_b >= key_def->part_count); - (void) field_count_b; - return key_compare_parts<false, false>(key_a, key_b, - key_def->part_count, key_def); -} - -static int -tuple_compare_sequential_nullable(const struct tuple *tuple_a, - const struct tuple *tuple_b, - const struct key_def *key_def) -{ - assert(key_def->is_nullable); - assert(key_def_is_sequential(key_def)); - const char *key_a = tuple_data(tuple_a); - uint32_t field_count = mp_decode_array(&key_a); - assert(field_count >= key_def->part_count); - const char *key_b = tuple_data(tuple_b); - field_count = mp_decode_array(&key_b); - assert(field_count >= key_def->part_count); - (void) field_count; - return key_compare_parts<true, true>(key_a, key_b, - key_def->unique_part_count, - key_def); + uint32_t fc_b = mp_decode_array(&key_b); + if (! is_nullable) { + assert(fc_a >= key_def->part_count); + assert(fc_b >= key_def->part_count); + return key_compare_parts<false>(key_a, key_b, + key_def->part_count, key_def); + } + bool was_null_met = false; + const struct key_part *part = key_def->parts; + const struct key_part *end = part + key_def->unique_part_count; + int rc; + uint32_t i = 0; + for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) { + enum mp_type a_type = mp_typeof(*key_a); + enum mp_type b_type = mp_typeof(*key_b); + if (a_type == MP_NIL) { + if (b_type != MP_NIL) + return -1; + was_null_met = true; + } else if (b_type == MP_NIL) { + return 1; + } else { + rc = tuple_compare_field_with_hint(key_a, a_type, key_b, + b_type, part->type, + part->coll); + if (rc != 0) + return rc; + } + } + if (! was_null_met) + return 0; + end = key_def->parts + key_def->part_count; + for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) { + rc = tuple_compare_field(key_a, key_b, part->type, + part->coll); + if (rc != 0) + return rc; + } + return 0; } template <int TYPE> @@ -916,8 +914,9 @@ tuple_compare_t tuple_compare_create(const struct key_def *def) { if (def->is_nullable) { if (key_def_is_sequential(def)) - return tuple_compare_sequential_nullable; - return tuple_compare_slowpath<true>; + return tuple_compare_sequential<true>; + else + return tuple_compare_slowpath<true>; } if (!key_def_has_collation(def)) { /* Precalculated comparators don't use collation */ @@ -936,8 +935,9 @@ tuple_compare_create(const struct key_def *def) { } } if (key_def_is_sequential(def)) - return tuple_compare_sequential; - return tuple_compare_slowpath<false>; + return tuple_compare_sequential<false>; + else + return tuple_compare_slowpath<false>; } /* }}} tuple_compare */