From 1150adf2e5e5a341ebe3a0c2769717bec8b38141 Mon Sep 17 00:00:00 2001 From: Aleksandr Lyapunov <alyapunov@tarantool.org> Date: Tue, 1 Feb 2022 17:44:58 +0300 Subject: [PATCH] box: implement complex foreign keys Implement complext foreign keys addition to field foreign keys. They are quite similar to field foreign keys, the difference is: * The are set in space options instead of format field definition. * Several fields may be specified in relation. * By design field foreign keys are more optimal. One can set up foreign keys in space options: box.schema.space.create(.. {.., foreign_key=<foreign_key>}) where foreign_key can be of one of the following forms: foreign_key={space=..,field=..} foreign_key={<name1>={space=..,field=..}, ..} where field must be a table with local -> foreing fields mapping: field={local_field1=foreign_field1, ..} NO_DOC=see later commits NO_CHANGELOG=see later commits --- src/box/errcode.h | 1 + src/box/field_def.c | 3 +- src/box/lua/schema.lua | 79 ++- src/box/lua/space.cc | 70 ++- src/box/space.c | 10 +- src/box/space_def.c | 33 ++ src/box/tuple_constraint.c | 12 +- src/box/tuple_constraint.h | 31 +- src/box/tuple_constraint_def.c | 136 ++++- src/box/tuple_constraint_def.h | 31 +- src/box/tuple_constraint_fkey.c | 316 ++++++++++- src/box/tuple_constraint_fkey.h | 2 + test/box/error.result | 1 + .../gh_6436_complex_foreign_key_test.lua | 534 ++++++++++++++++++ 14 files changed, 1198 insertions(+), 61 deletions(-) create mode 100644 test/engine-luatest/gh_6436_complex_foreign_key_test.lua diff --git a/src/box/errcode.h b/src/box/errcode.h index 26ef5fc284..00a4b656f2 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -292,6 +292,7 @@ struct errcode_record { /*237 */_(ER_CREATE_FOREIGN_KEY, "Failed to create foreign key '%s' in space '%s': %s") \ /*238 */_(ER_FOREIGN_KEY_INTEGRITY, "Foreign key '%s' integrity check failed: %s") \ /*239 */_(ER_FIELD_FOREIGN_KEY_FAILED, "Foreign key constraint '%s' failed for field '%s': %s") \ + /*239 */_(ER_COMPLEX_FOREIGN_KEY_FAILED, "Foreign key constraint '%s' failed: %s") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/field_def.c b/src/box/field_def.c index 2cc0264a5b..1d1f4f513f 100644 --- a/src/box/field_def.c +++ b/src/box/field_def.c @@ -262,5 +262,6 @@ field_def_parse_foreign_key(const char **data, void *opts, struct field_def *def = (struct field_def *)opts; return tuple_constraint_def_decode_fkey(data, &def->constraint_def, &def->constraint_count, - region, errcode, field_no); + region, errcode, field_no, + false); } diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 883fb21cdb..154e98ed71 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -453,7 +453,11 @@ end -- Helper of normalize_foreign_key. -- Check and normalize one foreign key definition. -local function normalize_foreign_key_one(def, error_prefix) +-- If not is_complex, field is expected to be a numeric ID or string name of +-- foreign field. +-- If is_complex, field is expected to be a table with local field -> +-- foreign field mapping. +local function normalize_foreign_key_one(def, error_prefix, is_complex) if def.space == nil then box.error(box.error.ILLEGAL_PARAMS, error_prefix .. "foreign key: space must be specified") @@ -466,13 +470,51 @@ local function normalize_foreign_key_one(def, error_prefix) box.error(box.error.ILLEGAL_PARAMS, error_prefix .. "foreign key: space must be string or number") end - if type(def.field) ~= 'string' and type(def.field) ~= 'number' then - box.error(box.error.ILLEGAL_PARAMS, - error_prefix .. "foreign key: field must be string or number") - end - if type(def.field) == 'number' then - -- convert to zero-based index. - def.field = def.field - 1 + if not is_complex then + if type(def.field) ~= 'string' and type(def.field) ~= 'number' then + box.error(box.error.ILLEGAL_PARAMS, + error_prefix .. "foreign key: field must be string or number") + end + if type(def.field) == 'number' then + -- convert to zero-based index. + def.field = def.field - 1 + end + else + if type(def.field) ~= 'table' then + box.error(box.error.ILLEGAL_PARAMS, + error_prefix .. "foreign key: field must be a table " .. + "with local field -> foreign field mapping") + end + local count = 0 + local converted = {} + for k,v in pairs(def.field) do + count = count + 1 + if type(k) ~= 'string' and type(k) ~= 'number' then + box.error(box.error.ILLEGAL_PARAMS, + error_prefix .. "foreign key: local field must be " + .. "string or number") + end + if type(k) == 'number' then + -- convert to zero-based index. + k = k - 1 + end + if type(v) ~= 'string' and type(v) ~= 'number' then + box.error(box.error.ILLEGAL_PARAMS, + error_prefix .. "foreign key: foreign field must be " + .. "string or number") + end + if type(v) == 'number' then + -- convert to zero-based index. + v = v - 1 + end + converted[k] = v + end + if count < 1 then + box.error(box.error.ILLEGAL_PARAMS, + error_prefix .. "foreign key: field must be a table " .. + "with local field -> foreign field mapping") + end + def.field = setmap(converted) end if not box.space[def.space] then box.error(box.error.ILLEGAL_PARAMS, @@ -493,9 +535,13 @@ end -- Given definition @a fkey is expected to be one of: -- {space=.., field=..} -- {fkey_name={space=.., field=..}, } +-- If not is_complex, field is expected to be a numeric ID or string name of +-- foreign field. +-- If is_complex, field is expected to be a table with local field -> +-- foreign field mapping. -- In case of error box.error.ILLEGAL_PARAMS is raised, and @a error_prefix -- is added before string message. -local function normalize_foreign_key(fkey, error_prefix) +local function normalize_foreign_key(fkey, error_prefix, is_complex) if fkey == nil then return nil end @@ -507,7 +553,7 @@ local function normalize_foreign_key(fkey, error_prefix) if fkey.space ~= nil and fkey.field ~= nil and (type(fkey.space) ~= 'table' or type(fkey.field) ~= 'table') then -- the first, short form. - fkey = normalize_foreign_key_one(fkey, error_prefix) + fkey = normalize_foreign_key_one(fkey, error_prefix, is_complex) return {[box.space[fkey.space].name]=fkey} end -- the second, detailed form. @@ -523,7 +569,7 @@ local function normalize_foreign_key(fkey, error_prefix) error_prefix .. "foreign key definition must be a table " .. "with 'space' and 'field' members") end - v = normalize_foreign_key_one(v, error_prefix) + v = normalize_foreign_key_one(v, error_prefix, is_complex) result[k] = v end return result @@ -596,6 +642,7 @@ box.schema.space.create = function(name, options) is_sync = 'boolean', defer_deletes = 'boolean', constraint = 'string, table', + foreign_key = 'table', } local options_defaults = { engine = 'memtx', @@ -642,6 +689,7 @@ box.schema.space.create = function(name, options) check_param(format, 'format', 'table') format = update_format(format) local constraint = normalize_constraint(options.constraint, '') + local foreign_key = normalize_foreign_key(options.foreign_key, '', true) -- filter out global parameters from the options array local space_options = setmap({ group_id = options.is_local and 1 or nil, @@ -649,6 +697,7 @@ box.schema.space.create = function(name, options) is_sync = options.is_sync, defer_deletes = options.defer_deletes and true or nil, constraint = constraint, + foreign_key = foreign_key, }) _space:insert{id, uid, name, options.engine, options.field_count, space_options, format} @@ -744,6 +793,7 @@ local alter_space_template = { defer_deletes = 'boolean', name = 'string', constraint = 'string, table', + foreign_key = 'table', } box.schema.space.alter = function(space_id, options) @@ -797,6 +847,13 @@ box.schema.space.alter = function(space_id, options) flags.constraint = normalize_constraint(options.constraint, '') end + if options.foreign_key ~= nil then + if table.equals(options.foreign_key, {}) then + options.foreign_key = nil + end + flags.foreign_key = normalize_foreign_key(options.foreign_key, '', true) + end + tuple = tuple:totable() tuple[2] = owner tuple[3] = name diff --git a/src/box/lua/space.cc b/src/box/lua/space.cc index 5b52a153e2..e578c7cf77 100644 --- a/src/box/lua/space.cc +++ b/src/box/lua/space.cc @@ -243,7 +243,13 @@ lbox_push_space_constraint(struct lua_State *L, struct space *space, int i) { assert(i >= 0); struct tuple_format *fmt = space->format; - if (fmt->constraint_count == 0) { + uint32_t constraint_count = 0; + for (size_t k = 0; k < fmt->constraint_count; k++) { + struct tuple_constraint *c = &fmt->constraint[k]; + if (c->def.type == CONSTR_FUNC) + constraint_count++; + } + if (constraint_count == 0) { /* No constraints - no field. */ lua_pushnil(L); lua_setfield(L, i, "constraint"); @@ -252,12 +258,73 @@ lbox_push_space_constraint(struct lua_State *L, struct space *space, int i) lua_newtable(L); for (size_t k = 0; k < fmt->constraint_count; k++) { + if (fmt->constraint[k].def.type != CONSTR_FUNC) + continue; lua_pushnumber(L, fmt->constraint[k].def.func.id); lua_setfield(L, -2, fmt->constraint[k].def.name); } lua_setfield(L, i, "constraint"); } +/** + * Helper function of lbox_push_space_foreign_key. + * Push a value @a def to the top of lua stack @a L. + */ +static void +lbox_push_field_id(struct lua_State *L, + struct tuple_constraint_field_id *def) +{ + if (def->name_len == 0) + lua_pushnumber(L, def->id); + else + lua_pushstring(L, def->name); +} + +/** + * Create foreign_key field in lua space object, given by index i in lua stack. + * If the space has no foreign keys, there will be no foreign_key field. + */ +static void +lbox_push_space_foreign_key(struct lua_State *L, struct space *space, int i) +{ + assert(i >= 0); + struct tuple_format *fmt = space->format; + uint32_t foreign_key_count = 0; + for (size_t k = 0; k < fmt->constraint_count; k++) { + struct tuple_constraint *c = &fmt->constraint[k]; + if (c->def.type == CONSTR_FKEY) + foreign_key_count++; + } + if (foreign_key_count == 0) { + /* No foreign keys - no field. */ + lua_pushnil(L); + lua_setfield(L, i, "foreign_key"); + return; + } + + lua_newtable(L); + for (size_t k = 0; k < fmt->constraint_count; k++) { + struct tuple_constraint *c = &fmt->constraint[k]; + if (c->def.type != CONSTR_FKEY) + continue; + + lua_newtable(L); + lua_pushnumber(L, c->def.fkey.space_id); + lua_setfield(L, -2, "space"); + lua_newtable(L); + for (uint32_t j = 0; j < c->def.fkey.field_mapping_size; j++) { + struct tuple_constraint_fkey_field_mapping *m = + &c->def.fkey.field_mapping[j]; + lbox_push_field_id(L, &m->local_field); + lbox_push_field_id(L, &m->foreign_field); + lua_settable(L, -3); + } + lua_setfield(L, -2, "field"); + lua_setfield(L, -2, fmt->constraint[k].def.name); + } + lua_setfield(L, i, "foreign_key"); +} + /** * Make a single space available in Lua, * via box.space[] array. @@ -498,6 +565,7 @@ lbox_fillspace(struct lua_State *L, struct space *space, int i) lbox_push_ck_constraint(L, space, i); lbox_push_space_constraint(L, space, i); + lbox_push_space_foreign_key(L, space, i); lua_getfield(L, LUA_GLOBALSINDEX, "box"); lua_pushstring(L, "schema"); diff --git a/src/box/space.c b/src/box/space.c index 70c0a29394..16992ea7ed 100644 --- a/src/box/space.c +++ b/src/box/space.c @@ -141,8 +141,14 @@ space_init_constraints(struct space *space) constr->def.type == CONSTR_FKEY; if (constr->check != tuple_constraint_noop_check) continue; - if (tuple_constraint_func_init(constr, space) != 0) - return -1; + if (constr->def.type == CONSTR_FUNC) { + if (tuple_constraint_func_init(constr, space) != 0) + return -1; + } else { + assert(constr->def.type == CONSTR_FKEY); + if (tuple_constraint_fkey_init(constr, space, -1) != 0) + return -1; + } } for (uint32_t i = 0; i < tuple_format_field_count(format); i++) { struct tuple_field *field = tuple_format_field(format, i); diff --git a/src/box/space_def.c b/src/box/space_def.c index cabec04f63..0ae75dfeae 100644 --- a/src/box/space_def.c +++ b/src/box/space_def.c @@ -59,6 +59,15 @@ space_opts_parse_constraint(const char **data, void *vopts, struct region *region, uint32_t errcode, uint32_t field_no); +/** + * Callback to parse a value with 'foreign_key' key in msgpack space opts + * definition. See function definition below. + */ +static int +space_opts_parse_foreign_key(const char **data, void *vopts, + struct region *region, + uint32_t errcode, uint32_t field_no); + const struct opt_def space_opts_reg[] = { OPT_DEF("group_id", OPT_UINT32, struct space_opts, group_id), OPT_DEF("temporary", OPT_BOOL, struct space_opts, is_temporary), @@ -67,6 +76,7 @@ const struct opt_def space_opts_reg[] = { OPT_DEF("defer_deletes", OPT_BOOL, struct space_opts, defer_deletes), OPT_DEF("sql", OPT_STRPTR, struct space_opts, sql), OPT_DEF_CUSTOM("constraint", space_opts_parse_constraint), + OPT_DEF_CUSTOM("foreign_key", space_opts_parse_foreign_key), OPT_DEF_LEGACY("checks"), OPT_END, }; @@ -353,3 +363,26 @@ space_opts_parse_constraint(const char **data, void *vopts, &opts->constraint_count, region, errcode, field_no); } + +/** + * Parse foreign key array from msgpack. + * Used as callback to parse a value with 'foreign_key' key in space options. + * Move @a data msgpack pointer to the end of msgpack value. + * By convention @a opts must point to corresponding struct space_opts. + * Allocate a temporary constraint array on @a region and set pointer to it + * as field_def->constraint, also setting field_def->constraint_count. + * Return 0 on success or -1 on error (diag is set to @a errcode with + * reference to field by @a field_no). + */ +int +space_opts_parse_foreign_key(const char **data, void *vopts, + struct region *region, + uint32_t errcode, uint32_t field_no) +{ + /* Expected normal form of constraints: {name1={space=.., field=..}.. */ + struct space_opts *opts = (struct space_opts *)vopts; + return tuple_constraint_def_decode_fkey(data, &opts->constraint_def, + &opts->constraint_count, + region, errcode, field_no, + true); +} diff --git a/src/box/tuple_constraint.c b/src/box/tuple_constraint.c index 5c57f918b9..7baa75bcca 100644 --- a/src/box/tuple_constraint.c +++ b/src/box/tuple_constraint.c @@ -44,8 +44,11 @@ tuple_constraint_array_new(const struct tuple_constraint_def *defs, for (size_t i = 0; i < count; i++) { if (defs[i].type != CONSTR_FKEY) continue; + uint32_t field_count = defs[i].fkey.field_mapping_size; + if (field_count == 0) + field_count = 1; /* field foreign key */ size_t size = offsetof(struct tuple_constraint_fkey_data, - data[1]); + data[field_count]); grp_alloc_reserve_data(&all, size); } struct tuple_constraint *res = @@ -60,10 +63,13 @@ tuple_constraint_array_new(const struct tuple_constraint_def *defs, res[i].fkey = NULL; continue; } + uint32_t field_count = defs[i].fkey.field_mapping_size; + if (field_count == 0) + field_count = 1; /* field foreign key */ size_t size = offsetof(struct tuple_constraint_fkey_data, - data[1]); + data[field_count]); res[i].fkey = grp_alloc_create_data(&all, size); - res[i].fkey->field_count = 1; + res[i].fkey->field_count = field_count; } assert(grp_alloc_size(&all) == 0); diff --git a/src/box/tuple_constraint.h b/src/box/tuple_constraint.h index e26deb1a72..fbc0e5b06c 100644 --- a/src/box/tuple_constraint.h +++ b/src/box/tuple_constraint.h @@ -51,6 +51,18 @@ struct tuple_constraint_fkey_pair_data { * by name. */ int32_t local_field_no; + /** + * Offset of corresponding field pair in foreign index. Can be -1 if + * the index is not found. See tuple_constraint_fkey_data::data for + * more details. + */ + int16_t foreign_index_order; + /** + * Offset of corresponding field pair in local index. Can be -1 if + * the index is not found. See tuple_constraint_fkey_data::data for + * more details. + */ + int16_t local_index_order; }; /** @@ -73,7 +85,24 @@ struct tuple_constraint_fkey_data { */ uint32_t field_count; /** - * Array of data of each local/foreign field pair. + * Array of data of each local/foreign field pair or of index parts. + * First of all there are foreign_field_no and local_field_no members + * there, that declares correspondence of local and foreign tuple + * fields. The order of that pairs is unspecified and not even + * important: constraint checks are made by space index that is + * searched by a set of fields despite of order. Actually there two + * indexes - foreign (used for check before addition to local space) + * and local (used for check before deletion from foreign space). + * Note that for the first check (in foreign index) fields of local + * tuple (from local space) are used as a key, while for the second + * check (in local index) - fields of foreign tuple are needed. Thus, + * to extract a correct key, we have to know to which field pair each + * index's key_def part correspond to. This mapping is stored in this + * array in foreign_index_order and local_index_order members. For + * example for foreign index query we need take the following fields + * of local tuple: + * data[data[0].foreign_index_order],data[data[1].foreign_index_order].. + * Symmetrically for local index. */ struct tuple_constraint_fkey_pair_data data[]; }; diff --git a/src/box/tuple_constraint_def.c b/src/box/tuple_constraint_def.c index 575b1007a7..576db89967 100644 --- a/src/box/tuple_constraint_def.c +++ b/src/box/tuple_constraint_def.c @@ -35,7 +35,24 @@ tuple_constraint_def_cmp_fkey(const struct tuple_constraint_fkey_def *def1, { if (def1->space_id != def2->space_id) return def1->space_id < def2->space_id ? -1 : 1; - return field_id_cmp(&def1->field, &def2->field); + if (def1->field_mapping_size != def2->field_mapping_size) + return def1->field_mapping_size < def2->field_mapping_size ? -1 + : 1; + if (def1->field_mapping_size == 0) + return field_id_cmp(&def1->field, &def2->field); + + for (uint32_t i = 0; i < def1->field_mapping_size; i++) { + int rc; + rc = field_id_cmp(&def1->field_mapping[i].local_field, + &def2->field_mapping[i].local_field); + if (rc != 0) + return rc; + rc = field_id_cmp(&def1->field_mapping[i].foreign_field, + &def2->field_mapping[i].foreign_field); + if (rc != 0) + return rc; + } + return 0; } int @@ -173,11 +190,54 @@ field_id_decode(const char **data, struct tuple_constraint_field_id *def, return 0; } +/** + * Helper function of tuple_constraint_def_decode_fkey. + * Decode foreign key field mapping, that is expected to be MP_MAP with + * local field (id or name) -> foreign field (id or name) correspondence. + */ +static int +field_mapping_decode(const char **data, + struct tuple_constraint_fkey_def *fkey, + struct region *region, uint32_t errcode, uint32_t field_no) +{ + if (mp_typeof(**data) != MP_MAP) { + diag_set(ClientError, errcode, field_no, + "field mapping is expected to be a map"); + return -1; + } + uint32_t mapping_size = mp_decode_map(data); + if (mapping_size == 0) { + diag_set(ClientError, errcode, field_no, + "field mapping is expected to be a map"); + return -1; + } + fkey->field_mapping_size = mapping_size; + size_t sz; + fkey->field_mapping = + region_alloc_array(region, + struct tuple_constraint_fkey_field_mapping, + mapping_size, &sz); + if (fkey->field_mapping == NULL) { + diag_set(OutOfMemory, sz, "region", "field mapping"); + return -1; + } + for (uint32_t i = 0 ; i < 2 * mapping_size; i++) { + struct tuple_constraint_field_id *def = i % 2 == 0 ? + &fkey->field_mapping[i / 2].local_field : + &fkey->field_mapping[i / 2].foreign_field; + int rc = field_id_decode(data, def, region, errcode, field_no); + if (rc != 0) + return rc; + } + return 0; +} + int tuple_constraint_def_decode_fkey(const char **data, struct tuple_constraint_def **def, uint32_t *count, struct region *region, - uint32_t errcode, uint32_t field_no) + uint32_t errcode, uint32_t field_no, + bool is_complex) { /* * Expected normal form of foreign keys: {name1=data1, name2=data2..}, @@ -232,6 +292,7 @@ tuple_constraint_def_decode_fkey(const char **data, "is expected to be a map"); return -1; } + new_def[i].fkey.field_mapping_size = 0; uint32_t def_size = mp_decode_map(data); bool has_space = false, has_field = false; for (size_t j = 0; j < def_size; j++) { @@ -260,13 +321,16 @@ tuple_constraint_def_decode_fkey(const char **data, } int rc; struct tuple_constraint_fkey_def *fk = &new_def[i].fkey; - if (is_space) { + if (is_space) rc = space_id_decode(data, &fk->space_id, errcode, field_no); - } else { + else if (!is_complex) rc = field_id_decode(data, &fk->field, region, errcode, field_no); - } + else + rc = field_mapping_decode(data, fk, + region, errcode, + field_no); if (rc != 0) return rc; } @@ -283,6 +347,15 @@ tuple_constraint_def_decode_fkey(const char **data, /** * Copy tuple_constraint_field_id object, allocating data on given allocator. */ +static void +field_id_reserve(const struct tuple_constraint_field_id *def, + struct grp_alloc *all) +{ + /* Reservation is required only for strings. */ + if (def->name_len != 0) + grp_alloc_reserve_str(all, def->name_len); +} + static void field_id_copy(struct tuple_constraint_field_id *dst, const struct tuple_constraint_field_id *src, @@ -296,6 +369,47 @@ field_id_copy(struct tuple_constraint_field_id *dst, dst->name = ""; } +/** + * Reserve memory for field mapping of given constraint definition @a def + * on an allocator @all. + */ +static void +field_mapping_reserve(const struct tuple_constraint_fkey_def *def, + struct grp_alloc *all) +{ + assert(def->field_mapping_size != 0); + size_t bytes = def->field_mapping_size * sizeof(def->field_mapping[0]); + grp_alloc_reserve_data(all, bytes); + for (uint32_t i = 0; i < def->field_mapping_size; i++) { + const struct tuple_constraint_fkey_field_mapping *f = + &def->field_mapping[i]; + field_id_reserve(&f->local_field, all); + field_id_reserve(&f->foreign_field, all); + } +} + +/** + * Copy field mapping array from one definition to another. + */ +static void +field_mapping_copy(struct tuple_constraint_fkey_def *dst, + const struct tuple_constraint_fkey_def *src, + struct grp_alloc *all) +{ + assert(src->field_mapping_size != 0); + dst->field_mapping_size = src->field_mapping_size; + size_t bytes = src->field_mapping_size * sizeof(dst->field_mapping[0]); + dst->field_mapping = grp_alloc_create_data(all, bytes); + for (uint32_t i = 0; i < src->field_mapping_size; i++) { + struct tuple_constraint_fkey_field_mapping *d = + &dst->field_mapping[i]; + const struct tuple_constraint_fkey_field_mapping *s = + &src->field_mapping[i]; + field_id_copy(&d->local_field, &s->local_field, all); + field_id_copy(&d->foreign_field, &s->foreign_field, all); + } +} + /** * Reserve strings needed for given constraint definition @a dev in given * string @a bank. @@ -306,8 +420,10 @@ tuple_constraint_def_reserve(const struct tuple_constraint_def *def, { grp_alloc_reserve_str(all, def->name_len); if (def->type == CONSTR_FKEY) { - if (def->fkey.field.name_len != 0) - grp_alloc_reserve_str(all, def->fkey.field.name_len); + if (def->fkey.field_mapping_size == 0) + field_id_reserve(&def->fkey.field, all); + else + field_mapping_reserve(&def->fkey, all); } } @@ -328,7 +444,11 @@ tuple_constraint_def_copy(struct tuple_constraint_def *dst, } else { assert(src->type == CONSTR_FKEY); dst->fkey.space_id = src->fkey.space_id; - field_id_copy(&dst->fkey.field, &src->fkey.field, all); + dst->fkey.field_mapping_size = 0; + if (src->fkey.field_mapping_size == 0) + field_id_copy(&dst->fkey.field, &src->fkey.field, all); + else + field_mapping_copy(&dst->fkey, &src->fkey, all); } } diff --git a/src/box/tuple_constraint_def.h b/src/box/tuple_constraint_def.h index cfde33aabd..b356ac7dc2 100644 --- a/src/box/tuple_constraint_def.h +++ b/src/box/tuple_constraint_def.h @@ -43,14 +43,34 @@ struct tuple_constraint_field_id { const char *name; }; +/** + * Definition of one pair in foreign key field mapping. + * Used only for complex foreign keys. + */ +struct tuple_constraint_fkey_field_mapping { + /** Field in local space. */ + struct tuple_constraint_field_id local_field; + /** Field in foreign space. */ + struct tuple_constraint_field_id foreign_field; +}; + /** * Definition of a foreign key. */ struct tuple_constraint_fkey_def { /** Definition of space. */ uint32_t space_id; - /** Definition of field. */ - struct tuple_constraint_field_id field; + /** + * Number of records in field map. Nonzero only for complex foreign + * keys. Zero for field foreign keys. + */ + uint32_t field_mapping_size; + union { + /** Definition of field. */ + struct tuple_constraint_field_id field; + /** Field mapping. */ + struct tuple_constraint_fkey_field_mapping *field_mapping; + }; }; /** @@ -105,6 +125,9 @@ tuple_constraint_def_decode(const char **data, /** * Parse constraint array from msgpack @a *data with the following format: * {foreign_key_name={space=.., field=...},...} + * If @a is_complex is false, the field is parsed as ID or name. + * If @a is_complex is true, the field is parsed as msgpack map of + * local (as ID or name) to foreign (ad ID or name) field pairs. * Allocate a temporary constraint array on @a region and save it in @a def. * If there are some constraints already (*def != NULL, *count != 0) then * append the array with parsed constraints. @@ -121,7 +144,9 @@ int tuple_constraint_def_decode_fkey(const char **data, struct tuple_constraint_def **def, uint32_t *count, struct region *region, - uint32_t errcode, uint32_t field_no); + uint32_t errcode, uint32_t field_no, + bool is_complex); + /** * Allocate a single memory block needed for given @a count of constraint * definitions, including strings in them. Fill the block with strings and diff --git a/src/box/tuple_constraint_fkey.c b/src/box/tuple_constraint_fkey.c index bafa029b3b..a451dc58cc 100644 --- a/src/box/tuple_constraint_fkey.c +++ b/src/box/tuple_constraint_fkey.c @@ -15,6 +15,14 @@ #include "tt_static.h" #include "trivia/util.h" +/** Static buffer size for extraction of complex keys from tuples. */ +enum { + COMPLEX_KEY_BUFFER_SIZE = 4096, +}; + +/** Static buffer for extraction of complex keys from tuples. */ +static char complex_key_buffer[COMPLEX_KEY_BUFFER_SIZE]; + /** * Find field number in @a space by field def in constraint. * Return -1 if not found. @@ -65,15 +73,21 @@ fkey_update_index_common(struct tuple_constraint *constr, bool is_foreign) */ uint32_t j; for (j = 0; j < field_count; j++) { - uint32_t field_no = is_foreign ? + int32_t field_no = is_foreign ? constr->fkey->data[j].foreign_field_no : constr->fkey->data[j].local_field_no; + assert(field_no >= 0); uint32_t k; for (k = 0; k < field_count; k++) - if (parts[k].fieldno == field_no) + if (parts[k].fieldno == (uint32_t)field_no) break; if (k == field_count) break; /* Not found. */ + int16_t *order; + order = is_foreign ? + &constr->fkey->data[k].foreign_index_order : + &constr->fkey->data[k].local_index_order; + *order = j; } if (j != field_count) continue; /* Not all found. */ @@ -113,12 +127,127 @@ field_foreign_key_failed(const struct tuple_constraint *constr, const struct tuple_field *field, const char *message) { - const char *field_path = tuple_field_path(field, constr->space->format); - struct error *err = diag_set(ClientError, ER_FIELD_FOREIGN_KEY_FAILED, - constr->def.name, field_path, message); + struct error *err; + const char *field_path = NULL; + if (field != NULL) { + field_path = tuple_field_path(field, constr->space->format); + err = diag_set(ClientError, ER_FIELD_FOREIGN_KEY_FAILED, + constr->def.name, field_path, message); + } else { + err = diag_set(ClientError, ER_COMPLEX_FOREIGN_KEY_FAILED, + constr->def.name, message); + } error_set_str(err, "name", constr->def.name); - error_set_str(err, "field_path", field_path); - error_set_uint(err, "field_id", field->id); + if (field != NULL) { + error_set_str(err, "field_path", field_path); + error_set_uint(err, "field_id", field->id); + } +} + +/** + * Auxiliary data structure that is used for complex key extraction from tuple. + */ +struct key_info { + /** Index of key part in key definition. */ + uint32_t index_order; + /** Field number of key part. */ + uint32_t field_no; + /** Msgpack data of that part in tuple. */ + const char *mp_data; + /** Size of msgpack data of that part in tuple. */ + size_t mp_data_size; +}; + +/** Sort by index_order compare function. */ +static int +key_info_by_order(const void *ptr1, const void *ptr2) +{ + const struct key_info *info1 = (const struct key_info *)ptr1; + const struct key_info *info2 = (const struct key_info *)ptr2; + return info1->index_order < info2->index_order ? -1 : + info1->index_order > info2->index_order; +} + +/** Sort by field_no compare function. */ +static int +key_info_by_field_no(const void *ptr1, const void *ptr2) +{ + const struct key_info *info1 = (const struct key_info *)ptr1; + const struct key_info *info2 = (const struct key_info *)ptr2; + return info1->field_no < info2->field_no ? -1 : + info1->field_no > info2->field_no; +} + +/** + * Get of extract key for foreign index from local tuple by given as @a mp_data. + * Simply return mp_data for field foreign key - it is the field itself. + * For complex foreign keys collect field in one contiguous buffer. + * Try to place resulting key in @a *buffer, that must be a buffer of size + * COMPLEX_KEY_BUFFER_SIZE. If there's not enough space - allocate needed + * using xmalloc - that pointer is returned via @a buffer. Thus if the pointer + * is changed - a user of that function must free() the buffer after usage. + * Return pointer to ready-to-use key in any case. + */ +static const char * +get_or_extract_key_mp(const struct tuple_constraint *constr, + struct key_def *def, char **buffer, const char *mp_data) +{ + if (constr->def.fkey.field_mapping_size == 0) + return mp_data; + + assert(def->part_count == constr->def.fkey.field_mapping_size); + const uint32_t info_count = def->part_count; + struct key_info info[info_count]; + + /* Collect fields_no in index order. */ + for (uint32_t i = 0; i < info_count; ++i) { + info[i].index_order = i; + int16_t pair_no = constr->fkey->data[i].foreign_index_order; + info[i].field_no = constr->fkey->data[pair_no].local_field_no; + } + + /* Reorder by fields_no, traverse tuple and collect fields. */ + qsort(info, def->part_count, sizeof(info[0]), key_info_by_field_no); + assert(mp_typeof(*mp_data) == MP_ARRAY); + uint32_t tuple_size = mp_decode_array(&mp_data); + uint32_t info_pos = 0; + size_t total_size = 0; + for (uint32_t i = 0; i < tuple_size; i++) { + const char *mp_data_end = mp_data; + mp_next(&mp_data_end); + + while (i == info[info_pos].field_no) { + info[info_pos].mp_data = mp_data; + info[info_pos].mp_data_size = mp_data_end - mp_data; + total_size += info[info_pos].mp_data_size; + info_pos++; + if (info_pos == info_count) + break; + } + + if (info_pos == info_count) + break; + + mp_data = mp_data_end; + } + + if (info_pos != info_count) + return NULL; /* End of tuple reached unexpectedly. */ + + /* Allocate of necessary. */ + if (total_size > COMPLEX_KEY_BUFFER_SIZE) + *buffer = xmalloc(total_size); + char *key = *buffer; + char *w_pos = key; + + /* Reorder back to index order and join fields in one buffer. */ + qsort(info, def->part_count, sizeof(info[0]), key_info_by_order); + for (uint32_t i = 0; i < info_count; ++i) { + memcpy(w_pos, info[i].mp_data, info[i].mp_data_size); + w_pos += info[i].mp_data_size; + } + + return key; } /** @@ -130,7 +259,7 @@ tuple_constraint_fkey_check(const struct tuple_constraint *constr, const struct tuple_field *field) { (void)mp_data_end; - assert(field != NULL); + assert((constr->def.fkey.field_mapping_size == 0) == (field != NULL)); struct space *foreign_space = constr->space_cache_holder.space; if (recovery_state <= FINAL_RECOVERY) { @@ -150,29 +279,47 @@ tuple_constraint_fkey_check(const struct tuple_constraint *constr, "foreign index was not found"); return -1; } + for (uint32_t i = 0; i < constr->fkey->field_count; i++) { + if (constr->fkey->data[i].local_field_no < 0) { + field_foreign_key_failed(constr, field, + "wrong local field name"); + return -1; + } + } struct index *index = foreign_space->index[constr->fkey->foreign_index]; struct key_def *key_def = index->def->key_def; uint32_t part_count = constr->fkey->field_count; assert(constr->fkey->field_count == key_def->part_count); - const char *key = mp_data; + char *key_buffer = complex_key_buffer; + const char *key = get_or_extract_key_mp(constr, key_def, + &key_buffer, mp_data); + if (key == NULL) { + field_foreign_key_failed(constr, field, "extract key failed"); + return -1; + } + int rc = -1; const char *unused; if (key_validate_parts(key_def, key, part_count, false, &unused) != 0) { field_foreign_key_failed(constr, field, "wrong key type"); - return -1; + goto done; } struct tuple *tuple = NULL; if (index_get(index, key, part_count, &tuple) != 0) { field_foreign_key_failed(constr, field, "index get failed"); - return -1; + goto done; } if (tuple == NULL) { field_foreign_key_failed(constr, field, "foreign tuple was not found"); - return -1; + goto done; } - return 0; + rc = 0; +done: + if (key_buffer != complex_key_buffer) + free(key_buffer); + return rc; } /** @@ -187,17 +334,73 @@ foreign_key_integrity_failed(const struct tuple_constraint *constr, error_set_str(err, "name", constr->def.name); } +/** + * Get of extract key for local index from foreign @a tuple. + * For field foreign key - return pointer to the field inside of tuple. + * For complex foreign keys collect field in one contiguous buffer. + * Try to place resulting key in @a *buffer, that must be a buffer of size + * COMPLEX_KEY_BUFFER_SIZE. If there's not enough space - allocate needed + * using xmalloc - that pointer is returned via @a buffer. Thus if the pointer + * is changed - a user of that function must free() the buffer after usage. + * Return pointer to ready-to-use key in any case. + */ +static const char * +get_or_extract_key_tuple(const struct tuple_constraint *constr, + struct key_def *def, char **buffer, + struct tuple *tuple) +{ + if (constr->def.fkey.field_mapping_size == 0) { + assert(constr->fkey->field_count == 1); + return tuple_field(tuple, + constr->fkey->data[0].foreign_field_no); + } + + assert(def->part_count == constr->def.fkey.field_mapping_size); + const uint32_t info_count = def->part_count; + struct key_info info[info_count]; + + /* Traverse fields and calculate total size. */ + size_t total_size = 0; + for (uint32_t i = 0; i < info_count; ++i) { + int16_t pair_no = constr->fkey->data[i].local_index_order; + int32_t field_no = constr->fkey->data[pair_no].foreign_field_no; + const char *field = tuple_field(tuple, field_no); + if (field == NULL || *field == MP_NIL) + return NULL; + info[i].mp_data = field; + mp_next(&field); + info[i].mp_data_size = field - info[i].mp_data; + total_size += info[i].mp_data_size; + } + + /* Allocate of necessary. */ + if (total_size > COMPLEX_KEY_BUFFER_SIZE) + *buffer = xmalloc(total_size); + char *key = *buffer; + char *w_pos = key; + + /* Join fields in one buffer. */ + for (uint32_t i = 0; i < info_count; ++i) { + memcpy(w_pos, info[i].mp_data, info[i].mp_data_size); + w_pos += info[i].mp_data_size; + } + + return key; +} + int tuple_constraint_fkey_check_delete(const struct tuple_constraint *constr, struct tuple *deleted_tuple, struct tuple *replaced_with_tuple) { assert(deleted_tuple != NULL); - int32_t foreign_field_no = constr->fkey->data->foreign_field_no; - if (foreign_field_no < 0) { - foreign_key_integrity_failed(constr, - "wrong foreign field name"); - return -1; + for (uint32_t i = 0; i < constr->fkey->field_count; i++) { + if (constr->fkey->data[i].foreign_field_no < 0) { + foreign_key_integrity_failed(constr, + "wrong foreign " + "field name"); + return -1; + } } if (replaced_with_tuple != NULL) { /* @@ -240,28 +443,37 @@ tuple_constraint_fkey_check_delete(const struct tuple_constraint *constr, uint32_t part_count = constr->fkey->field_count; assert(constr->fkey->field_count == key_def->part_count); - const char *key = tuple_field(deleted_tuple, foreign_field_no); + char *key_buffer = complex_key_buffer; + const char *key = get_or_extract_key_tuple(constr, key_def, + &key_buffer, deleted_tuple); + if (key == NULL || mp_typeof(*key) == MP_NIL) { - /* No field - nobody can be bound to it.*/ + /* No field(s) - nobody can be bound to them.*/ return 0; } + int rc = -1; const char *unused; if (key_validate_parts(key_def, key, part_count, false, &unused) != 0) { foreign_key_integrity_failed(constr, "wrong key type"); - return -1; + goto done; } struct tuple *found_tuple; if (index->def->opts.is_unique ? index_get(index, key, part_count, &found_tuple) : index_min(index, key, part_count, &found_tuple) != 0) - return -1; + goto done; if (found_tuple != NULL) { foreign_key_integrity_failed(constr, "tuple is referenced"); - return -1; + goto done; } - return 0; + rc = 0; + +done: + if (key_buffer != complex_key_buffer) + free(key_buffer); + return rc; } /** @@ -286,11 +498,54 @@ tuple_constraint_fkey_update_foreign(struct tuple_constraint *constraint) { struct space *space = constraint->space_cache_holder.space; constraint->fkey->foreign_index = -1; - assert(constraint->fkey->field_count == 1); - constraint->fkey->data[0].foreign_field_no = - find_field_no_by_def(space, &constraint->def.fkey.field); - if (constraint->fkey->data[0].foreign_field_no >= 0) - fkey_update_foreign_index(constraint); + uint32_t field_mapping_size = constraint->def.fkey.field_mapping_size; + if (field_mapping_size == 0) { + assert(constraint->fkey->field_count == 1); + constraint->fkey->data[0].foreign_field_no = + find_field_no_by_def(space, + &constraint->def.fkey.field); + if (constraint->fkey->data[0].foreign_field_no >= 0) + fkey_update_foreign_index(constraint); + return; + } + for (uint32_t i = 0; i < field_mapping_size; i++) { + struct tuple_constraint_field_id *f = + &constraint->def.fkey.field_mapping[i].foreign_field; + int32_t field_no = find_field_no_by_def(space, f); + constraint->fkey->data[i].foreign_field_no = field_no; + if (field_no < 0) + return; + } + fkey_update_foreign_index(constraint); +} + +/** + * Find and set local_field_no amd local_index fkey member of @a constraint. + * If something was not found - local_index is set to -1. + */ +static void +tuple_constraint_fkey_update_local(struct tuple_constraint *constraint, + int32_t field_no) +{ + struct space *space = constraint->space; + constraint->fkey->local_index = -1; + uint32_t field_mapping_size = constraint->def.fkey.field_mapping_size; + if (field_mapping_size == 0) { + assert(constraint->fkey->field_count == 1); + constraint->fkey->data[0].local_field_no = field_no; + assert(field_no >= 0); + fkey_update_local_index(constraint); + return; + } + for (uint32_t i = 0; i < field_mapping_size; i++) { + struct tuple_constraint_field_id *f = + &constraint->def.fkey.field_mapping[i].local_field; + field_no = find_field_no_by_def(space, f); + constraint->fkey->data[i].local_field_no = field_no; + if (field_no < 0) + return; + } + fkey_update_local_index(constraint); } /** @@ -314,8 +569,7 @@ tuple_constraint_fkey_init(struct tuple_constraint *constr, { assert(constr->def.type == CONSTR_FKEY); constr->space = space; - constr->fkey->data[0].local_field_no = field_no; - fkey_update_local_index(constr); + tuple_constraint_fkey_update_local(constr, field_no); struct space *foreign_space; foreign_space = space_by_id(constr->def.fkey.space_id); diff --git a/src/box/tuple_constraint_fkey.h b/src/box/tuple_constraint_fkey.h index 2b5c20de70..e92501fd9b 100644 --- a/src/box/tuple_constraint_fkey.h +++ b/src/box/tuple_constraint_fkey.h @@ -17,6 +17,8 @@ struct space; /** * Initialize @a constraint assuming that it is a foreign key. + * If this is a field constraint, @a field_no must be that field's index. + * If this is a complex constraint, @a field_no must be -1. */ int tuple_constraint_fkey_init(struct tuple_constraint *constraint, diff --git a/test/box/error.result b/test/box/error.result index cac4e26c9c..74c6b37004 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -458,6 +458,7 @@ t; | 237: box.error.CREATE_FOREIGN_KEY | 238: box.error.FOREIGN_KEY_INTEGRITY | 239: box.error.FIELD_FOREIGN_KEY_FAILED + | 240: box.error.COMPLEX_FOREIGN_KEY_FAILED | ... test_run:cmd("setopt delimiter ''"); diff --git a/test/engine-luatest/gh_6436_complex_foreign_key_test.lua b/test/engine-luatest/gh_6436_complex_foreign_key_test.lua new file mode 100644 index 0000000000..78a1904ef1 --- /dev/null +++ b/test/engine-luatest/gh_6436_complex_foreign_key_test.lua @@ -0,0 +1,534 @@ +-- https://github.com/tarantool/tarantool/issues/6436 Foreign keys +local server = require('test.luatest_helpers.server') +local t = require('luatest') +local g = t.group('gh-6436-foreign-key-test', {{engine = 'memtx'}, {engine = 'vinyl'}}) + +g.before_all(function(cg) + cg.server = server:new({alias = 'master'}) + cg.server:start() +end) + +g.after_all(function(cg) + cg.server:stop() + cg.server = nil +end) + +g.before_each(function() +end) + +g.after_each(function(cg) + cg.server:exec(function() + if box.space.city then box.space.city:drop() end + if box.space.country then box.space.country:drop() end + if box.space.user then box.space.user:drop() end + if box.space.card then box.space.card:drop() end + end) +end) + +-- Test with wrong complex foreign key definitions. +g.test_bad_complex_foreign_key = function(cg) + local engine = cg.params.engine + + cg.server:exec(function(engine) + local t = require('luatest') + local fmt = {{'planet_id','unsigned'}, {'country_id','unsigned'}, {'name'}} + local country = box.schema.create_space('country', {engine=engine, format=fmt}) + country:create_index('pk', {parts={{'planet_id'},{'country_id'}}}) + local fmt = {{'city_id'}, {'p_id'}, {'c_id'}} + local function space_opts(foreign_key) + return {engine=engine, format=fmt, foreign_key=foreign_key} + end + local opts = space_opts({space=false,field={}}) + t.assert_error_msg_content_equals( + "Illegal parameters, foreign key: space must be string or number", + function() box.schema.create_space('city', opts) end + ) + local opts = space_opts({space='country',field='country_id'}) + t.assert_error_msg_content_equals( + "Illegal parameters, foreign key: field must be a table with local field -> foreign field mapping", + function() box.schema.create_space('city', opts) end + ) + opts = space_opts({space='country',field={}}) + t.assert_error_msg_content_equals( + "Illegal parameters, foreign key: field must be a table with local field -> foreign field mapping", + function() box.schema.create_space('city', opts) end + ) + opts = space_opts({space='country',field={[false]='country_id'}}) + t.assert_error_msg_content_equals( + "Illegal parameters, foreign key: local field must be string or number", + function() box.schema.create_space('city', opts) end + ) + opts = space_opts({space='country',field={c_id=false}}) + t.assert_error_msg_content_equals( + "Illegal parameters, foreign key: foreign field must be string or number", + function() box.schema.create_space('city', opts) end + ) + opts = space_opts({cntr={space='country',field={p_id='planet_id', c_id='country_id'}}}) + box.schema.create_space('city', opts) + t.assert_equals(box.space.city.foreign_key, + { cntr = {field = {c_id = "country_id", p_id = "planet_id"}, space = country.id} } + ) + end, {engine}) +end + +-- Test with complex foreign key by primary index. +g.test_complex_foreign_key_primary = function(cg) + local engine = cg.params.engine + + cg.server:exec(function(engine) + local t = require('luatest') + local country = box.schema.create_space('country', {engine=engine}) + country:create_index('pk', {parts={{1},{2}}}) + country:replace{1, 11, 'Russia'} + country:replace{1, 12, 'France'} + + local function city_space_opts(foreign_key) + local fmt = {{name='id', type='unsigned'}, + {name='p_id', type='unsigned'}, + {name='c_id', type='unsigned'}} + return {engine=engine, format=fmt, foreign_key=foreign_key} + end + + local fkey = {space='country',field={p_id='planet_id', c_id='country_id'}} + local city = box.schema.create_space('city', city_space_opts(fkey)) + -- Note that the foreign_key was normalized + t.assert_equals(box.space.city.foreign_key, + { country = {field = {c_id = "country_id", p_id = "planet_id"}, space = country.id} } + ) + city:create_index('pk') + + t.assert_equals(country:select{}, {{1, 11, 'Russia'}, {1, 12, 'France'}}) + t.assert_error_msg_content_equals( + "Can't modify space 'country': space is referenced by foreign key", + function() country:drop() end + ) + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: wrong foreign field name", + function() country:delete{1, 11} end + ) + t.assert_equals(country:select{}, {{1, 11, 'Russia'}, {1, 12, 'France'}}) + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: foreign index was not found", + function() city:replace{1, 1, 11, 'Moscow'} end + ) + local fmt = {{'planet_id','unsigned'}, {'country_id','unsigned'}, {'name'}} + country:format(fmt) + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: foreign tuple was not found", + function() city:replace{1, 1, 500, 'Moscow'} end + ) + city:replace{21, 1, 11, 'Moscow'} + end, {engine}) + + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local city = box.space.city + city:replace{22, 1, 11, 'Tomsk'} + t.assert_equals(city:select{}, {{21, 1, 11, 'Moscow'}, {22, 1, 11, 'Tomsk'}}) + end, {engine}) + + cg.server:eval('box.snapshot()') + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local country = box.space.country + local city = box.space.city + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: index was not found", + function() country:delete{1, 12} end + ) + city:create_index('country', {parts={{'p_id'},{'c_id'}},unique=false}) + country:delete{1, 12} + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: tuple is referenced", + function() country:delete{1, 11} end + ) + city:delete{21} + city:delete{22} + country:delete{1, 11} + city:drop() + country:drop() + end, {engine}) +end + +-- Test with foreign key by secondary index and some variations. +g.test_complex_foreign_key_secondary = function(cg) + local engine = cg.params.engine + + cg.server:exec(function(engine) + local t = require('luatest') + local country_fmt = {{name='id', type='unsigned'}, + {name='universe_id', type='unsigned'}, + {name='planet_name', type='string'}, + {name='country_code', type='string'}, + {name='name', type='string'}} + --Note: format is not set. + local country = box.schema.create_space('country', {engine=engine}) + country:create_index('pk') + country:replace{100, 1, 'earth', 'ru', 'Russia'} + country:replace{101, 1, 'earth', 'rf', 'France'} + + local function city_space_opts(foreign_key) + local fmt = {{name='id', type='unsigned'}, + {name='p', type='string'}, + {name='u', type='unsigned'}, + {name='c', type='string'}, + {name='name', type='string'}} + return {engine=engine, format=fmt, foreign_key=foreign_key} + end + local fkey = {cntr = {space='country', + field={c='country_code', + u='universe_id', + p='planet_name'}}} + local city = box.schema.create_space('city', city_space_opts(fkey)) + fkey.cntr.space = country.id + t.assert_equals(city.foreign_key, fkey); + city:create_index('pk') + + t.assert_equals(country:select{}, {{100, 1, 'earth', 'ru', 'Russia'}, + {101, 1, 'earth', 'rf', 'France'}}) + t.assert_error_msg_content_equals( + "Can't modify space 'country': space is referenced by foreign key", + function() country:drop() end + ) + t.assert_equals(country:select{}, {{100, 1, 'earth', 'ru', 'Russia'}, + {101, 1, 'earth', 'rf', 'France'}}) + t.assert_error_msg_content_equals( + "Foreign key constraint 'cntr' failed: foreign index was not found", + function() city:replace{21, 'earth', 1, 'ru', 'Moscow'} end + ) + country:format(country_fmt) + t.assert_error_msg_content_equals( + "Foreign key constraint 'cntr' failed: foreign index was not found", + function() city:replace{21, 'earth', 1, 'ru', 'Moscow'} end + ) + country:create_index('name1', {parts={{'universe_id'}}, + unique=false}) + country:create_index('name2', {parts={{'country_code'}, + {'universe_id'}}, + unique=false}) + country:create_index('name3', {parts={{'planet_name'}, + {'country_code'}, + {'universe_id'}}}) + t.assert_error_msg_content_equals( + "Foreign key constraint 'cntr' failed: foreign tuple was not found", + function() city:replace{21, 'earth', 1, 'de', 'Berlin'} end + ) + city:replace{21, 'earth', 1, 'ru', 'Moscow'} + end, {engine}) + + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local city = box.space.city + city:replace{22, 'earth', 1, 'ru', 'Tomsk'} + t.assert_equals(city:select{}, {{21, 'earth', 1, 'ru', 'Moscow'}, + {22, 'earth', 1, 'ru', 'Tomsk'}}) + end, {engine}) + + cg.server:eval('box.snapshot()') + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local country = box.space.country + local city = box.space.city + t.assert_error_msg_content_equals( + "Foreign key 'cntr' integrity check failed: index was not found", + function() country:delete{100} end + ) + city:create_index('name', {parts={{'u'},{'c'},{'p'}},unique=false}) + country:delete{101} + t.assert_error_msg_content_equals( + "Foreign key 'cntr' integrity check failed: tuple is referenced", + function() country:delete{100} end + ) + city:delete{21} + city:delete{22} + country:delete{100} + city:drop() + country:drop() + end, {engine}) +end + +-- The same test as above but with foreign key by numeric space and field. +g.test_complex_foreign_key_numeric = function(cg) + local engine = cg.params.engine + + cg.server:exec(function(engine) + local t = require('luatest') + local country_fmt = {{name='id', type='unsigned'}, + {name='universe_id', type='unsigned'}, + {name='planet_name', type='string'}, + {name='country_code', type='string'}, + {name='name', type='string'}} + --Note: format is not set. + local country = box.schema.create_space('country', {engine=engine}) + country:create_index('pk') + country:replace{100, 1, 'earth', 'ru', 'Russia'} + country:replace{101, 1, 'earth', 'rf', 'France'} + + local function city_space_opts(foreign_key) + local fmt = {{name='id', type='unsigned'}, + {name='p', type='string'}, + {name='u', type='unsigned'}, + {name='c', type='string'}, + {name='name', type='string'}} + return {engine=engine, format=fmt, foreign_key=foreign_key} + end + local fkey = {cntr = {space=country.id, + field={[4]=4, [3]=2, [2]=3}}} + local city = box.schema.create_space('city', city_space_opts(fkey)) + t.assert_equals(city.foreign_key, + {cntr = {field = {[1] = 2, [2] = 1, [3] = 3}, + space = country.id}}); + city:create_index('pk') + + t.assert_equals(country:select{}, {{100, 1, 'earth', 'ru', 'Russia'}, + {101, 1, 'earth', 'rf', 'France'}}) + t.assert_error_msg_content_equals( + "Can't modify space 'country': space is referenced by foreign key", + function() country:drop() end + ) + t.assert_equals(country:select{}, {{100, 1, 'earth', 'ru', 'Russia'}, + {101, 1, 'earth', 'rf', 'France'}}) + t.assert_error_msg_content_equals( + "Foreign key constraint 'cntr' failed: foreign index was not found", + function() city:replace{21, 'earth', 1, 'ru', 'Moscow'} end + ) + country:format(country_fmt) + t.assert_error_msg_content_equals( + "Foreign key constraint 'cntr' failed: foreign index was not found", + function() city:replace{21, 'earth', 1, 'ru', 'Moscow'} end + ) + country:create_index('name1', {parts={{'universe_id'}}, + unique=false}) + country:create_index('name2', {parts={{'country_code'}, + {'universe_id'}}, + unique=false}) + country:create_index('name3', {parts={{'planet_name'}, + {'country_code'}, + {'universe_id'}}}) + t.assert_error_msg_content_equals( + "Foreign key constraint 'cntr' failed: foreign tuple was not found", + function() city:replace{21, 'earth', 1, 'de', 'Berlin'} end + ) + city:replace{21, 'earth', 1, 'ru', 'Moscow'} + end, {engine}) + + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local city = box.space.city + city:replace{22, 'earth', 1, 'ru', 'Tomsk'} + t.assert_equals(city:select{}, {{21, 'earth', 1, 'ru', 'Moscow'}, + {22, 'earth', 1, 'ru', 'Tomsk'}}) + end, {engine}) + + cg.server:eval('box.snapshot()') + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local country = box.space.country + local city = box.space.city + t.assert_error_msg_content_equals( + "Foreign key 'cntr' integrity check failed: index was not found", + function() country:delete{100} end + ) + city:create_index('name', {parts={{'u'},{'c'},{'p'}},unique=false}) + country:delete{101} + t.assert_error_msg_content_equals( + "Foreign key 'cntr' integrity check failed: tuple is referenced", + function() country:delete{100} end + ) + city:delete{21} + city:delete{22} + country:delete{100} + city:drop() + country:drop() + end, {engine}) +end + +-- Test with foreign key and different types of indexes and fields. +g.test_complex_foreign_key_wrong_type = function(cg) + local engine = cg.params.engine + cg.server:exec(function(engine) + local t = require('luatest') + local fmt = {{'id', 'unsigned'}, {'planet_id','unsigned'}, + {'code','string'}, {'name','string'}} + local country = box.schema.create_space('country', {engine=engine, format=fmt}) + country:create_index('pk') + country:create_index('code', {parts={{'planet_id'},{'code'}}}) + country:replace{100, 1, 'ru','Russia'} + + local function city_space_opts(foreign_key) + local fmt = {{'id', 'unsigned'}, {'planet_id'}, {'country_code'}} + return {engine=engine, format=fmt, foreign_key=foreign_key} + end + local fkey = {space='country',field={planet_id='planet_id',country_code='code'}} + local city = box.schema.create_space('city', city_space_opts(fkey)) + city:create_index('pk') + + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: wrong key type", + function() city:replace{1, 1, 1} end + ) + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: wrong key type", + function() city:replace{1,'ru','ru'} end + ) + end, {engine}) + + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local city = box.space.city + + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: wrong key type", + function() city:replace{1, 1, 1} end + ) + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: wrong key type", + function() city:replace{1,'ru','ru'} end + ) + end, {engine}) + + cg.server:eval('box.snapshot()') + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local city = box.space.city + + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: wrong key type", + function() city:replace{1, 1, 1} end + ) + t.assert_error_msg_content_equals( + "Foreign key constraint 'country' failed: wrong key type", + function() city:replace{1,'ru','ru'} end + ) + end, {engine}) + + cg.server:exec(function() + local t = require('luatest') + local country = box.space.country + local city = box.space.city + city:create_index('wrong1', {parts={{'country_code', 'unsigned'},{'planet_id', 'unsigned'}}, unique=false}) + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: wrong key type", + function() country:delete{100} end + ) + end, {engine}) + + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local country = box.space.country + local city = box.space.city + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: wrong key type", + function() country:delete{100} end + ) + city.index.wrong1:drop() + city:create_index('wrong2', {parts={{'country_code', 'string'},{'planet_id', 'string'}}, unique=false}) + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: wrong key type", + function() country:delete{100} end + ) + end, {engine}) + + cg.server:eval('box.snapshot()') + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local country = box.space.country + local city = box.space.city + t.assert_error_msg_content_equals( + "Foreign key 'country' integrity check failed: wrong key type", + function() country:delete{100} end + ) + city.index.wrong2:drop() + end, {engine}) +end + +-- Test upsert of a comple foreign key. +g.test_complex_foreign_key_upsert = function(cg) + local engine = cg.params.engine + cg.server:exec(function(engine) + local t = require('luatest') + local card = box.schema.create_space( + 'card', + { + engine = engine, + format = { + { name='card_id1', type='unsigned' }, + { name='card_id2', type='unsigned' }, + { name='name', type='string' }, + } + } + ) + card:create_index('pk', {parts = {'card_id1', 'card_id2'}}) + + local user = box.schema.create_space( + 'user', + { + engine = engine, + format = { + { name='user_id', type='unsigned' }, + { name='card_id1', type='unsigned', is_nullable=true }, + { name='card_id2', type='unsigned', is_nullable=true }, + { name='name', type='string' }, + }, + foreign_key = { space = 'card', + field = { card_id1 = 'card_id1', + card_id2 = 'card_id2' } } + } + ) + user:create_index('pk') + + card:replace{1, 1, "hehe"} + user:replace{1, 1, 1, "haha"} + + t.assert_error_msg_content_equals( + "Foreign key constraint 'card' failed: foreign tuple was not found", + function() user:upsert({1, 1, 1, "haha"}, {{'=', 2, 2}}) end + ) + end, {engine}) + + cg.server:eval('box.snapshot()') + + cg.server:exec(function() + local t = require('luatest') + local user = box.space.user + t.assert_error_msg_content_equals( + "Foreign key constraint 'card' failed: foreign tuple was not found", + function() user:upsert({1, 1, 1, "haha"}, {{'=', 2, 2}}) end + ) + end) + + cg.server:eval('box.snapshot()') + cg.server:restart() + + cg.server:exec(function() + local t = require('luatest') + local user = box.space.user + t.assert_error_msg_content_equals( + "Foreign key constraint 'card' failed: foreign tuple was not found", + function() user:upsert({1, 1, 1, "haha"}, {{'=', 2, 2}}) end + ) + + box.space.user:drop() + box.space.card:drop() + end) +end -- GitLab