diff --git a/changelogs/unreleased/gh-8587-unique-violation-in-functional-index-with-nullable.md b/changelogs/unreleased/gh-8587-unique-violation-in-functional-index-with-nullable.md new file mode 100644 index 0000000000000000000000000000000000000000..7b7c8e20b509cdd293482a4a7c8c4b8688bf9147 --- /dev/null +++ b/changelogs/unreleased/gh-8587-unique-violation-in-functional-index-with-nullable.md @@ -0,0 +1,4 @@ +## bugfix/core + +* Fixed a bug when a tuple could be inserted even if it violates a `unique` + constraint of a functional index that has a nullable part (gh-8587). diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc index 6bf9310fe05b070250e7c01b080d9a6747dce996..fc34797a425acc96cce25019a68bef3e06d44169 100644 --- a/src/box/tuple_compare.cc +++ b/src/box/tuple_compare.cc @@ -674,7 +674,7 @@ tuple_compare_slowpath(struct tuple *tuple_a, hint_t tuple_a_hint, if (!is_nullable || !was_null_met) return 0; /* - * Inxex parts are equal and contain NULLs. So use + * Index parts are equal and contain NULLs. So use * extended parts only. */ end = key_def->parts + key_def->part_count; @@ -819,17 +819,21 @@ tuple_compare_with_key_slowpath(struct tuple *tuple, hint_t tuple_hint, * will point to the first field that differ or to the end of key or * part_count + 1 field in order. * Key arguments must not be NULL, allowed to be NULL if dereferenced. + * Set was_null_met to true if at least one part is NULL, to false otherwise. */ template<bool is_nullable> static inline int key_compare_and_skip_parts(const char **key_a, const char **key_b, - uint32_t part_count, struct key_def *key_def) + uint32_t part_count, struct key_def *key_def, + bool *was_null_met) { assert(is_nullable == key_def->is_nullable); assert(key_a != NULL && key_b != NULL); assert((*key_a != NULL && *key_b != NULL) || part_count == 0); struct key_part *part = key_def->parts; int rc; + *was_null_met = false; + if (likely(part_count == 1)) { enum mp_type a_type = mp_typeof(**key_a); enum mp_type b_type = mp_typeof(**key_b); @@ -838,8 +842,10 @@ key_compare_and_skip_parts(const char **key_a, const char **key_b, part->coll); } else if (a_type == MP_NIL) { rc = b_type == MP_NIL ? 0 : -1; + *was_null_met = true; } else if (b_type == MP_NIL) { rc = 1; + *was_null_met = true; } else { rc = tuple_compare_field_with_type(*key_a, a_type, *key_b, b_type, @@ -867,9 +873,11 @@ key_compare_and_skip_parts(const char **key_a, const char **key_b, enum mp_type a_type = mp_typeof(**key_a); enum mp_type b_type = mp_typeof(**key_b); if (a_type == MP_NIL) { + *was_null_met = true; if (b_type != MP_NIL) return -1; } else if (b_type == MP_NIL) { + *was_null_met = true; return 1; } else { rc = tuple_compare_field_with_type(*key_a, a_type, @@ -886,10 +894,11 @@ key_compare_and_skip_parts(const char **key_a, const char **key_b, template<bool is_nullable> static inline int key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count, - struct key_def *key_def) + struct key_def *key_def, bool *was_null_met) { return key_compare_and_skip_parts<is_nullable>(&key_a, &key_b, - part_count, key_def); + part_count, key_def, + was_null_met); } template<bool is_nullable, bool has_optional_parts> @@ -914,8 +923,9 @@ tuple_compare_with_key_sequential(struct tuple *tuple, hint_t tuple_hint, assert(field_count >= part_count); cmp_part_count = part_count; } + bool unused; rc = key_compare_parts<is_nullable>(tuple_key, key, cmp_part_count, - key_def); + key_def, &unused); if (!has_optional_parts || rc != 0) return rc; /* @@ -949,12 +959,13 @@ key_compare(const char *key_a, uint32_t part_count_a, hint_t key_a_hint, assert(part_count_b <= key_def->part_count); uint32_t part_count = MIN(part_count_a, part_count_b); assert(part_count <= key_def->part_count); + bool unused; if (! key_def->is_nullable) { return key_compare_parts<false>(key_a, key_b, part_count, - key_def); + key_def, &unused); } else { return key_compare_parts<true>(key_a, key_b, part_count, - key_def); + key_def, &unused); } } @@ -978,8 +989,10 @@ tuple_compare_sequential(struct tuple *tuple_a, hint_t tuple_a_hint, if (!has_optional_parts && !is_nullable) { assert(fc_a >= key_def->part_count); assert(fc_b >= key_def->part_count); + bool unused; return key_compare_parts<false>(key_a, key_b, - key_def->part_count, key_def); + key_def->part_count, key_def, + &unused); } bool was_null_met = false; struct key_part *part = key_def->parts; @@ -1400,13 +1413,17 @@ static const comparator_with_key_signature cmp_wk_arr[] = { /** * A functional index tuple compare. - * tuple_a_hint and tuple_b_hint are expected to be valid - * pointers to functional key memory. These keys have been already - * validated and are represented as a MsgPack array with exactly - * func_index_part_count parts. The cmp_def has part_count > func_index_part_count, - * since it was produced by key_def_merge() of the functional key part - * and the primary key. So its tail parts are taken from primary - * index key definition. + * tuple_a_hint and tuple_b_hint are expected to be valid pointers to functional + * key memory. These keys are represented as a MsgPack array with the number of + * elements equal to the functional index definition part_count (let's denote + * it as func_index_part_count). The keys have been already validated by + * key_list_iterator_next(). + * + * In case of unique non-nullable index, the non-extended key_def is used as a + * cmp_def (see memtx_tree_index_new_tpl()). Otherwise, the extended cmp_def has + * part_count > func_index_part_count, since it was produced by key_def_merge() + * of the functional key part and the primary key. So its tail parts are taken + * from primary index key definition. */ template<bool is_nullable> static inline int @@ -1423,12 +1440,24 @@ func_index_compare(struct tuple *tuple_a, hint_t tuple_a_hint, uint32_t part_count_a = mp_decode_array(&key_a); assert(mp_typeof(*key_b) == MP_ARRAY); uint32_t part_count_b = mp_decode_array(&key_b); + assert(part_count_a == part_count_b); + uint32_t key_part_count = part_count_a; + (void)part_count_b; - uint32_t key_part_count = MIN(part_count_a, part_count_b); + bool was_null_met; int rc = key_compare_parts<is_nullable>(key_a, key_b, key_part_count, - cmp_def); + cmp_def, &was_null_met); if (rc != 0) return rc; + /* + * Tuples with nullified fields may violate the uniqueness constraint + * so if we encountered a nullified field while comparing the secondary + * key parts we must proceed with comparing the primary key parts even + * if the secondary index is unique. + */ + if (key_part_count == cmp_def->unique_part_count && + (!is_nullable || !was_null_met)) + return 0; /* * Primary index definition key compare. * It cannot contain nullable parts so the code is @@ -1481,9 +1510,10 @@ func_index_compare_with_key(struct tuple *tuple, hint_t tuple_hint, uint32_t tuple_key_count = mp_decode_array(&tuple_key); uint32_t cmp_part_count = MIN(part_count, tuple_key_count); cmp_part_count = MIN(cmp_part_count, key_def->part_count); + bool unused; int rc = key_compare_and_skip_parts<is_nullable>(&tuple_key, &key, cmp_part_count, - key_def); + key_def, &unused); if (rc != 0) return rc; /* Equals if nothing to compare. */ diff --git a/test/box-luatest/gh_8587_func_index_unique_violation_test.lua b/test/box-luatest/gh_8587_func_index_unique_violation_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..2ef8fcefed05249777defeb75848ceedb1134d86 --- /dev/null +++ b/test/box-luatest/gh_8587_func_index_unique_violation_test.lua @@ -0,0 +1,106 @@ +local t = require('luatest') + +local g = t.group('gh-8587', t.helpers.matrix{ + is_unique = {true, false}, + is_nullable = {true, false}}) +g.before_all(function(cg) + local server = require('luatest.server') + cg.server = server:new{alias = 'master'} + cg.server:start() +end) + +g.after_all(function(cg) + cg.server:drop() +end) + +g.after_each(function(cg) + cg.server:exec(function() + if box.space.test then box.space.test:drop() end + end) +end) + +-- Test a simple functional index with all combinations of +-- unique = {true, false} and is_nullable = {true, false}. +g.test_simple = function(cg) + cg.server:exec(function(is_unique, is_nullable) + local opts = {format = {{name = 'id', type = 'unsigned'}, + {name = 'f2', type = 'string', + is_nullable = is_nullable}}} + local s = box.schema.space.create('test', opts) + s:create_index('pk') + + opts = {is_sandboxed = true, + is_deterministic = true, + body = "function(tuple) return {tuple[2]} end"} + box.schema.func.create('func_ret_f2', opts) + + opts = {func = 'func_ret_f2', + unique = is_unique, + parts = {{field = 1, type = 'string', + is_nullable = is_nullable}}} + local idx = s:create_index('idx', opts) + + if is_nullable then + s:insert{1, box.NULL} + s:insert{2, box.NULL} + t.assert_equals(idx:select(), {{1}, {2}}) + end + + s:insert{3, 'aa'} + if is_unique then + t.assert_error_msg_content_equals( + 'Duplicate key exists in unique index "idx" in space "test" ' .. + 'with old tuple - [3, "aa"] and new tuple - [4, "aa"]', + function() s:insert{4, 'aa'} end) + t.assert_equals(idx:select{'aa'}, {{3, 'aa'}}) + else + s:insert{4, 'aa'} + t.assert_equals(idx:select{'aa'}, {{3, 'aa'}, {4, 'aa'}}) + end + end, {cg.params.is_unique, cg.params.is_nullable}) +end + +-- Test a multikey multipart functional index. +g.test_multikey_multipart = function(cg) + cg.server:exec(function(is_unique, is_nullable) + local s = box.schema.space.create('test') + s:format{{name = 'name', type = 'string'}, + {name = 'address', type = 'string'}, + {name = 'address2', type = 'string'}} + s:create_index('name', {parts = {{field = 1, type = 'string'}}}) + local lua_code = [[function(tuple) + local address = string.split(tuple[2]) + local ret = {} + for _, v in pairs(address) do + table.insert(ret, {utf8.upper(v), tuple[3]}) + end + return ret + end]] + box.schema.func.create('address', {body = lua_code, + is_sandboxed = true, + is_deterministic = true, + opts = {is_multikey = true}}) + s:create_index('addr', {unique = is_unique, + func = 'address', + parts = {{field = 1, type = 'string', + is_nullable = is_nullable}, + {field = 2, type = 'string', + is_nullable = is_nullable}}}) + s:insert{'James', 'SIS Building Lambeth London UK', '1'} + s:insert{'Sherlock', '221B Baker St Marylebone London NW1 6XE UK', '2'} + if is_unique then + t.assert_error_msg_content_equals( + 'Duplicate key exists in unique index "addr" in space "test"' .. + ' with old tuple - ["Sherlock", "221B Baker St Marylebone ' .. + 'London NW1 6XE UK", "2"] and new tuple - ["Sherlock2", "' .. + '221B Baker St Marylebone London NW1 6XE UK", "2"]', + function() + s:insert{'Sherlock2', + '221B Baker St Marylebone London NW1 6XE UK', '2'} + end) + else + s:insert{'Sherlock2', + '221B Baker St Marylebone London NW1 6XE UK', '2'} + end + end, {cg.params.is_unique, cg.params.is_nullable}) +end diff --git a/test/unit/key_def.c b/test/unit/key_def.c index b2cf0de1c246c59fc6d9a5a8c4c2f0bc9b1d2279..292cf6e2407a492c077ff18bcf24f389fc257bbc 100644 --- a/test/unit/key_def.c +++ b/test/unit/key_def.c @@ -134,6 +134,114 @@ test_key_def_new_func(const char *format, ...) return def; } +/** + * Checks that tuple_compare() -> func_index_compare() return value equals + * `expected`. + */ +static void +test_check_tuple_compare_func(struct key_def *cmp_def, + struct tuple *tuple_a, struct tuple *func_key_a, + struct tuple *tuple_b, struct tuple *func_key_b, + int expected) +{ + int r = tuple_compare(tuple_a, (hint_t)func_key_a, + tuple_b, (hint_t)func_key_b, cmp_def); + r = r > 0 ? 1 : r < 0 ? -1 : 0; + is(r, expected, "func_index_compare(%s/%s, %s/%s) = %d, expected %d", + tuple_str(tuple_a), tuple_str(func_key_a), + tuple_str(tuple_b), tuple_str(func_key_b), r, expected); +} + +static void +test_func_compare(void) +{ + plan(6); + header(); + + struct key_def *func_def = test_key_def_new_func( + "[{%s%u%s%s%s%b}{%s%u%s%s%s%b}]", + "field", 0, "type", "string", "is_nullable", 1, + "field", 1, "type", "string", "is_nullable", 1); + + struct key_def *pk_def = test_key_def_new( + "[{%s%u%s%s}]", + "field", 1, "type", "unsigned"); + + struct key_def *cmp_def = key_def_merge(func_def, pk_def); + /* Just like when `opts->is_unique == true`, see index_def_new(). */ + cmp_def->unique_part_count = func_def->part_count; + + struct testcase { + int expected_result; + struct tuple *tuple_a; + struct tuple *tuple_b; + struct tuple *func_key_a; + struct tuple *func_key_b; + }; + + struct testcase testcases[] = { + { + -1, /* func_key_a < func_key_b */ + test_tuple_new("[%s%u%s]", "--", 0, "--"), + test_tuple_new("[%s%u%s]", "--", 0, "--"), + test_tuple_new("[%sNIL]", "aa"), + test_tuple_new("[%s%s]", "aa", "bb"), + }, { + 1, /* func_key_a > func_key_b */ + test_tuple_new("[%s%u%s]", "--", 0, "--"), + test_tuple_new("[%s%u%s]", "--", 0, "--"), + test_tuple_new("[%s%s]", "aa", "bb"), + test_tuple_new("[%sNIL]", "aa"), + }, { + 0, /* func_key_a == func_key_b, pk not compared */ + test_tuple_new("[%s%u%s]", "--", 10, "--"), + test_tuple_new("[%s%u%s]", "--", 20, "--"), + test_tuple_new("[%s%s]", "aa", "bb"), + test_tuple_new("[%s%s]", "aa", "bb"), + }, { + -1, /* func_key_a == func_key_b, pk_a < pk_b */ + test_tuple_new("[%s%u%s]", "--", 30, "--"), + test_tuple_new("[%s%u%s]", "--", 40, "--"), + test_tuple_new("[%sNIL]", "aa"), + test_tuple_new("[%sNIL]", "aa"), + }, { + 1, /* func_key_a == func_key_b, pk_a > pk_b */ + test_tuple_new("[%s%u%s]", "--", 60, "--"), + test_tuple_new("[%s%u%s]", "--", 50, "--"), + test_tuple_new("[%sNIL]", "aa"), + test_tuple_new("[%sNIL]", "aa"), + }, { + 0, /* func_key_a == func_key_b, pk_a == pk_b */ + test_tuple_new("[%s%u%s]", "--", 70, "--"), + test_tuple_new("[%s%u%s]", "--", 70, "--"), + test_tuple_new("[%sNIL]", "aa"), + test_tuple_new("[%sNIL]", "aa"), + } + }; + + for (size_t i = 0; i < lengthof(testcases); i++) { + struct testcase *t = &testcases[i]; + test_check_tuple_compare_func(cmp_def, + t->tuple_a, t->func_key_a, + t->tuple_b, t->func_key_b, + t->expected_result); + } + + for (size_t i = 0; i < lengthof(testcases); i++) { + struct testcase *t = &testcases[i]; + tuple_delete(t->tuple_a); + tuple_delete(t->tuple_b); + tuple_delete(t->func_key_a); + tuple_delete(t->func_key_b); + } + key_def_delete(func_def); + key_def_delete(pk_def); + key_def_delete(cmp_def); + + footer(); + check_plan(); +} + /** * Checks that tuple_compare_with_key with cmp_def of functional index * returns the same result as comparison of concatenated func and primary keys. @@ -217,6 +325,8 @@ test_func_compare_with_key(void) for (unsigned int i = 0; i < lengthof(keys); i++) free(keys[i]); tuple_delete(func_key); + tuple_delete(tuple); + tuple_delete(model); key_def_delete(def); key_def_delete(pk_def); key_def_delete(cmp_def); @@ -350,9 +460,10 @@ test_tuple_validate_key_parts_raw(void) static int test_main(void) { - plan(3); + plan(4); header(); + test_func_compare(); test_func_compare_with_key(); test_tuple_extract_key_raw_slowpath_nullable(); test_tuple_validate_key_parts_raw();