diff --git a/sbroad-cartridge/src/api/invalidate_cached_schema.rs b/sbroad-cartridge/src/api/invalidate_cached_schema.rs index 2a0df21cfc399e073561ee211e20fb02851a982b..786c99f5ea28cefaa84cbebd0b925d252f0a1e5a 100644 --- a/sbroad-cartridge/src/api/invalidate_cached_schema.rs +++ b/sbroad-cartridge/src/api/invalidate_cached_schema.rs @@ -40,7 +40,7 @@ pub extern "C" fn invalidate_segment_cache(ctx: FunctionCtx, _: FunctionArgs) -> Ok(runtime) => { if let Err(e) = runtime.clear_config() { return tarantool_error(&format!( - "Failed to clear the configuration on segment during cache invalidation: {e:?}" + "Failed to clear the storage configuration: {e:?}" )); } ctx.return_mp(&true).unwrap(); diff --git a/sbroad-cartridge/test_app/test/integration/sql_cache_test.lua b/sbroad-cartridge/test_app/test/integration/sql_cache_test.lua index 220100859de7837db1554cdbdd94b71195127c7f..9a8a0ad7eb3dace0c657ec0c86a1435d21c5faa8 100644 --- a/sbroad-cartridge/test_app/test/integration/sql_cache_test.lua +++ b/sbroad-cartridge/test_app/test/integration/sql_cache_test.lua @@ -43,13 +43,14 @@ g.test_change_cache_by_config_replica = function() local cache_before_config = c["storage_cache_size_bytes"] t.assert_equals(20480000, cache_before_config) - -- here we check that storage-1-1 is master and that its config isn't updated + -- here we check that storage-1-1 is master t.assert_equals(false, storage11:eval("return box.info.ro")) - -- config was not applied on the master because the first query was 'select' and it was run on the replica - t.assert_not_equals(cache_before_config, storage11:eval("return box.cfg.sql_cache_size")) - -- but on the replica cache params must be updated - t.assert_equals(cache_before_config, storage12:eval("return box.cfg.sql_cache_size")) + -- config was applied on the master because the first query was 'select *' and it was run on the master + t.assert_equals(cache_before_config, storage11:eval("return box.cfg.sql_cache_size")) + -- but on the replica cache params won't be updated + t.assert_not_equals(cache_before_config, storage12:eval("return box.cfg.sql_cache_size")) + -- storage2 is master so the query was executed there and the cache was updated t.assert_equals(cache_before_config, storage2:eval("return box.cfg.sql_cache_size")) local cache_after_config = 4239361 @@ -62,9 +63,10 @@ g.test_change_cache_by_config_replica = function() t.assert_equals(false, storage11:eval("return box.info.ro")) - -- on the master config was not applied, because select query was ran on the replica - t.assert_not_equals(cache_after_config, storage11:eval("return box.cfg.sql_cache_size")) - -- but on the replica cache params must be updated - t.assert_equals(cache_after_config, storage12:eval("return box.cfg.sql_cache_size")) + -- on the master config was applied, because `select *` was executed on master + t.assert_equals(cache_after_config, storage11:eval("return box.cfg.sql_cache_size")) + -- but on the replica cache params were not updated + t.assert_not_equals(cache_after_config, storage12:eval("return box.cfg.sql_cache_size")) + -- storage2 is master so the query was executed there and the cache was updated t.assert_equals(cache_after_config, storage2:eval("return box.cfg.sql_cache_size")) end diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs index e32bb07ccc8597efd4e66a03ca2ff660a802ce59..ed80342e251ecda713d560b4d67964503f0699dd 100644 --- a/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad-core/src/backend/sql/ir.rs @@ -1,3 +1,4 @@ +use crate::debug; use ahash::AHashMap; use opentelemetry::Context; use serde::{Deserialize, Serialize}; @@ -7,7 +8,6 @@ use std::fmt::Write as _; use tarantool::tlua::{self, Push}; use tarantool::tuple::{FunctionArgs, Tuple}; -use crate::debug; use crate::errors::{Action, Entity, SbroadError}; use crate::executor::bucket::Buckets; use crate::executor::ir::ExecutionPlan; diff --git a/sbroad-core/src/core-router.lua b/sbroad-core/src/core-router.lua index f5efa2856a12218c308faa99db5d583c0ae1c145..4af155919d8c82a53433201328fc0c582ad85916 100644 --- a/sbroad-core/src/core-router.lua +++ b/sbroad-core/src/core-router.lua @@ -3,6 +3,196 @@ local vshard = require('vshard') local helper = require('sbroad.helper') +local fiber = require('fiber') +local table = require('table') +local ref_id = 0 +local DQL_MIN_TIMEOUT = 10 +local REF_MIN_TIMEOUT = 3 + +local function future_wait(cond, timeout) + local f = function(cond, timeout) + if timeout and timeout < 0 then + error("dql timeout exceeded") + end + local res, err = cond:wait_result(timeout) + if err then + error(err) + end + return res + end + local ok, res = pcall(f, cond, timeout) + if ok then + return res + end + return nil, res +end + +-- +-- Helper function to execute read request on multiple storages, +-- without buckets being moved between the storages by vhard +-- rebalancer. This function is a modified version of `map_callrw` +-- from vshard router api (see its doc for more details). +-- +-- To ensure data does not move between storages during execution, +-- there are two stages: ref and map. +-- 1. Ref stage creates a reference with deadline on each specified +-- replicaset's master. While this reference is alive the master +-- will not receive, nor send data. +-- 2. Map stage - after references were created on each master, request to execute +-- the given function is sent to each master. After function was executed the +-- reference is deleted. If the reference expires, the error will be returned +-- to router. +-- +-- @param uuid_to_args Mapping between replicaset uuid and function arguments +-- for that replicaset. +-- @param func Name of the function to call. +-- @param vtable_max_rows Maximum number of rows to receive from storages. +-- If the number is exceeded, the results are discarded and error is returned. +-- @param opts Table which may have the following options: +-- 1. ref_timeout - number of seconds to keep the reference alive. Shouldn't +-- be too big, because alive reference blocks vshard's rebalancing. +-- 2. timeout - timeout for the whole function execution. +-- 3. check_bucket_count - whether or not to check that ref stage has covered +-- all buckets. If this is true, the error will be returned if router's bucket +-- count != bucket count covered by ref stage. It may mean that router has +-- outdated configuration (some buckets were added/deleted on storages that router +-- does not know about). This option should be used only if you intend to execute +-- the function on all replicasets and want to ensure that all buckets were covered. +-- +local function multi_storage_dql(uuid_to_args, func, vtable_max_rows, opts) + local replicasets = vshard.router.routeall() + local timeout, ref_timeout, check_bucket_count + if opts then + timeout = opts.map_timeout or DQL_MIN_TIMEOUT + ref_timeout = opts.ref_timeout or REF_MIN_TIMEOUT + check_bucket_count = opts.check_bucket_count or false + else + timeout = DQL_MIN_TIMEOUT + ref_timeout = REF_MIN_TIMEOUT + check_bucket_count = false + end + + local err, err_uuid, res, ok + local futures = {} + local bucket_count = 0 + local opts_ref = {is_async = true} + local opts_map = {is_async = true} + local rs_count = 0 + local rid = ref_id + local deadline = fiber.clock() + timeout + local result = nil + ref_id = rid + 1 + -- Nil checks are done explicitly here (== nil instead of 'not'), because + -- netbox requests return box.NULL instead of nils. + + -- + -- Ref stage: send. + -- + for uuid, _ in pairs(uuid_to_args) do + local rs = replicasets[uuid] + res, err = rs:callrw('vshard.storage._call', + {'storage_ref', rid, ref_timeout}, opts_ref) + if res == nil then + err_uuid = uuid + goto fail + end + futures[uuid] = res + rs_count = rs_count + 1 + end + -- + -- Ref stage: collect. + -- + for uuid, future in pairs(futures) do + res, err = future_wait(future, timeout) + -- Handle netbox error first. + if res == nil then + err_uuid = uuid + goto fail + end + -- Ref returns nil,err or bucket count. + res, err = res[1], res[2] + if res == nil then + err_uuid = uuid + goto fail + end + bucket_count = bucket_count + res + timeout = deadline - fiber.clock() + end + + -- All refs are done but not all buckets are covered. This is odd and can + -- mean many things. The most possible ones: 1) outdated configuration on + -- the router and it does not see another replicaset with more buckets, + -- 2) some buckets are simply lost or duplicated - could happen as a bug, or + -- if the user does a maintenance of some kind by creating/deleting buckets. + -- In both cases can't guarantee all the data would be covered by Map calls. + if check_bucket_count and bucket_count ~= vshard.router.bucket_count() then + err = "All refs are done but not all buckets are covered" + goto fail + end + + -- Map stage: send. + -- + for uuid, rs_args in pairs(uuid_to_args) do + local rs = replicasets[uuid] + local args = {'storage_map', rid, 'box.schema.func.call', {func, rs_args['required'], rs_args['optional']}} + res, err = rs:callrw('vshard.storage._call', args, opts_map) + if res == nil then + err_uuid = uuid + goto fail + end + futures[uuid] = res + end + -- + -- Map stage: collect. + -- + for uuid, f in pairs(futures) do + res, err = future_wait(f, timeout) + if res == nil then + err_uuid = uuid + goto fail + end + -- Map returns true,res or nil,err. + ok, res = res[1], res[2] + if ok == nil then + err = res + err_uuid = uuid + goto fail + end + if res ~= nil then + local data = res[1] + if result == nil then + result = data + else + local new_vt_rows = #data.rows + #result.rows + if vtable_max_rows ~= 0 and vtable_max_rows < new_vt_rows then + err = helper.vtable_limit_exceeded(vtable_max_rows, new_vt_rows) + err_uuid = uuid + goto fail + end + for _, row in ipairs(data.rows) do + table.insert(result.rows, row) + end + end + end + timeout = deadline - fiber.clock() + end + do return result end + + ::fail:: + for uuid, f in pairs(futures) do + f:discard() + -- Best effort to remove the created refs before exiting. Can help if + -- the timeout was big and the error happened early. + f = replicasets[uuid]:callrw('vshard.storage._call', + {'storage_unref', rid}, opts_ref) + if f ~= nil then + -- Don't care waiting for a result - no time for this. But it won't + -- affect the request sending if the connection is still alive. + f:discard() + end + end + return nil, err, err_uuid +end _G.group_buckets_by_replicasets = function(buckets) local map = {} @@ -18,72 +208,82 @@ _G.group_buckets_by_replicasets = function(buckets) return map end -_G.dql_on_some = function(tbl_rs_ir, is_readonly, waiting_timeout, vtable_max_rows) - local result = nil - local futures = {} +_G.dql_on_some = function(uuid_to_args, is_readonly, waiting_timeout, vtable_max_rows) + local result + local call_opts = { is_async = true } - for rs_uuid, map in pairs(tbl_rs_ir) do + local exec_fn = helper.module_name() .. ".execute" + if #uuid_to_args == 1 then + -- When read request is executed only on one + -- storage, we don't care about bucket rebalancing. + local rs_uuid, rs_args = pairs(uuid_to_args).next() local replica = vshard.router.routeall()[rs_uuid] - local required = map["required"] - local optional = map["optional"] - local exec_fn = helper.module_name() .. ".execute" + local future, err + local args = { rs_args['required'], rs_args['optional'] } if is_readonly then - local future, err = replica:callbre( - exec_fn, - { required, optional }, - { is_async = true } + future, err = replica:callbre( + exec_fn, + args, + call_opts ) - if err ~= nil then - error(error) - end - table.insert(futures, future) else - local future, err = replica:callrw( - exec_fn, - { required, optional }, - { is_async = true } + future, err = replica:callrw( + exec_fn, + args, + call_opts ) - if err ~= nil then - error(error) - end - table.insert(futures, future) end - end - - for _, future in ipairs(futures) do + if err ~= nil then + error(err) + end future:wait_result(waiting_timeout) + -- vtable_max_rows limit was checked on + -- storage. No need to check it here. local res, err = future:result() - if err ~= nil then error(err) end - - local data = res[1][1][1] - if result == nil then - result = data - else - local new_vt_rows = #data.rows + #result.rows - if vtable_max_rows ~= 0 and vtable_max_rows < new_vt_rows then - error(helper.vtable_limit_exceeded(vtable_max_rows, new_vt_rows)) - end - for _, row in ipairs(data.rows) do - table.insert(result.rows, row) - end + result = res[1][1][1] + else + local err, err_uuid + local opts = { map_timeout = waiting_timeout } + result, err, err_uuid = multi_storage_dql(uuid_to_args, exec_fn, vtable_max_rows, opts) + if err ~= nil then + helper.dql_error(err, err_uuid) end end return box.tuple.new{result} end +_G.dql_on_all = function(required, optional, waiting_timeout, vtable_max_rows) + local replicasets = vshard.router.routeall() + local exec_fn = helper.module_name() .. ".execute" + local uuid_to_args = {} + for uuid, _ in pairs(replicasets) do + uuid_to_args[uuid] = { required = required, optional = optional } + end + local opts = { + map_timeout = waiting_timeout, + check_bucket_count = true + } + local result, err, err_uuid = multi_storage_dql(uuid_to_args, exec_fn, vtable_max_rows, opts) + if err ~= nil then + helper.dql_error(err, err_uuid) + end + + return box.tuple.new{result} +end + _G.dml_on_some = function(tbl_rs_ir, is_readonly, waiting_timeout) local result = nil + local exec_fn = helper.module_name() .. ".execute" local futures = {} for rs_uuid, map in pairs(tbl_rs_ir) do local replica = vshard.router.routeall()[rs_uuid] local required = map["required"] local optional = map["optional"] - local exec_fn = helper.module_name() .. ".execute" if is_readonly then local future, err = replica:callbre( exec_fn, @@ -91,7 +291,7 @@ _G.dml_on_some = function(tbl_rs_ir, is_readonly, waiting_timeout) { is_async = true } ) if err ~= nil then - error(error) + error(err) end table.insert(futures, future) else @@ -101,7 +301,7 @@ _G.dml_on_some = function(tbl_rs_ir, is_readonly, waiting_timeout) { is_async = true } ) if err ~= nil then - error(error) + error(err) end table.insert(futures, future) end @@ -125,60 +325,6 @@ _G.dml_on_some = function(tbl_rs_ir, is_readonly, waiting_timeout) return box.tuple.new{result} end -_G.dql_on_all = function(required, optional, is_readonly, waiting_timeout, vtable_max_rows) - local replicas = vshard.router.routeall() - local result = nil - local futures = {} - local exec_fn = helper.module_name() .. ".execute" - - for _, replica in pairs(replicas) do - if is_readonly then - local future, err = replica:callbre( - exec_fn, - { required, optional }, - { is_async = true } - ) - if err ~= nil then - error(err) - end - table.insert(futures, future) - else - local future, err = replica:callrw( - exec_fn, - { required, optional }, - { is_async = true } - ) - if err ~= nil then - error(err) - end - table.insert(futures, future) - end - end - - for _, future in ipairs(futures) do - future:wait_result(waiting_timeout) - local res, err = future:result() - - if res == nil then - error(err) - end - - local data = res[1][1][1] - if result == nil then - result = data - else - local new_vt_rows = #data.rows + #result.rows - if vtable_max_rows ~= 0 and vtable_max_rows < new_vt_rows then - error(helper.vtable_limit_exceeded(vtable_max_rows, new_vt_rows)) - end - for _, row in ipairs(data.rows) do - table.insert(result.rows, row) - end - end - end - - return box.tuple.new{result} -end _G.dml_on_all = function(required, optional, is_readonly, waiting_timeout) local replicas = vshard.router.routeall() diff --git a/sbroad-core/src/executor/engine/helpers/vshard.rs b/sbroad-core/src/executor/engine/helpers/vshard.rs index de7b58d98aa9dc723185b5106726fba61384f587..9696ffd009640e10bf9577e44b82acc7ba8d5842 100644 --- a/sbroad-core/src/executor/engine/helpers/vshard.rs +++ b/sbroad-core/src/executor/engine/helpers/vshard.rs @@ -95,7 +95,6 @@ fn dql_on_all( metadata: &impl Metadata, required: Binary, optional: Binary, - is_readonly: bool, vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { let lua = tarantool::lua_state(); @@ -107,7 +106,6 @@ fn dql_on_all( match exec_sql.call_with_args::<Tuple, _>(( required, optional, - is_readonly, waiting_timeout, vtable_max_rows, )) { @@ -163,13 +161,7 @@ pub fn exec_ir_on_all_buckets( vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { match &query_type { - QueryType::DQL => dql_on_all( - metadata, - required, - optional, - conn_type.is_readonly(), - vtable_max_rows, - ), + QueryType::DQL => dql_on_all(metadata, required, optional, vtable_max_rows), QueryType::DML => dml_on_all(metadata, required, optional, conn_type.is_readonly()), } } diff --git a/sbroad-core/src/helper.lua b/sbroad-core/src/helper.lua index c8e1ad0610e80337a306870da1b8a34f7e0f2e95..84712c728852b59d2691cac83d2f1af7d09ab1cc 100644 --- a/sbroad-core/src/helper.lua +++ b/sbroad-core/src/helper.lua @@ -48,9 +48,26 @@ local function vtable_limit_exceeded(limit, current_val) return string.format("Exceeded maximum number of rows (%d) in virtual table: %d", limit, current_val) end +local function dql_error(err, rs_uuid) + if type(err) ~= 'table' and type(err) ~= 'string' then + io.stderr:write(string.format("expected string or table, got: %s", type(err))) + error(err) + end + if type(err) == 'table' then + local meta_t = getmetatable(err) + meta_t.__tostring = function (self) + return self.message + end + err.uuid = rs_uuid + setmetatable(err, meta_t) + end + error(err) +end + return { module_name = module_name, vtable_limit_exceeded = vtable_limit_exceeded, + dql_error = dql_error, format_result = format_result, constants = constants }