Skip to content
Snippets Groups Projects

fix: vshard problems

Merged Denis Smirnov requested to merge 667-issue into main
@@ -2,21 +2,22 @@
-- Function, described in this file are called from `vshard` Sbroad module.
local vshard = require('vshard')
local lerror = require('vshard.error')
local vrs = require('vshard.replicaset')
local helper = require('sbroad.helper')
local fiber = require('fiber')
local table = require('table')
local ref_id = 0
local DQL_MIN_TIMEOUT = 10
local REF_MIN_TIMEOUT = 5
local function future_wait(cond, timeout)
local f = function(cond, timeout)
if timeout and timeout < 0 then
error("dql timeout exceeded")
return nil, lerror.make("dql timeout exceeded")
end
local res, err = cond:wait_result(timeout)
if err then
error(err)
return nil, lerror.make(err)
end
return res
end
@@ -24,7 +25,7 @@ local function future_wait(cond, timeout)
if ok then
return res
end
return nil, res
return nil, lerror.make(res)
end
--
@@ -49,10 +50,8 @@ end
-- @param vtable_max_rows Maximum number of rows to receive from storages.
-- If the number is exceeded, the results are discarded and error is returned.
-- @param opts Table which may have the following options:
-- 1. ref_timeout - number of seconds to keep the reference alive. Shouldn't
-- be too big, because alive reference blocks vshard's rebalancing.
-- 2. timeout - timeout for the whole function execution.
-- 3. check_bucket_count - whether or not to check that ref stage has covered
-- 1. timeout - timeout for the whole function execution.
-- 2. check_bucket_count - whether or not to check that ref stage has covered
-- all buckets. If this is true, the error will be returned if router's bucket
-- count != bucket count covered by ref stage. It may mean that router has
-- outdated configuration (some buckets were added/deleted on storages that router
@@ -61,14 +60,12 @@ end
--
local function multi_storage_dql(uuid_to_args, func, vtable_max_rows, opts)
local replicasets = vshard.router.routeall()
local timeout, ref_timeout, check_bucket_count
local timeout, check_bucket_count
if opts then
timeout = opts.map_timeout or DQL_MIN_TIMEOUT
ref_timeout = opts.ref_timeout or REF_MIN_TIMEOUT
timeout = opts.timeout or DQL_MIN_TIMEOUT
check_bucket_count = opts.check_bucket_count or false
else
timeout = DQL_MIN_TIMEOUT
ref_timeout = REF_MIN_TIMEOUT
check_bucket_count = false
end
@@ -85,13 +82,17 @@ local function multi_storage_dql(uuid_to_args, func, vtable_max_rows, opts)
-- Nil checks are done explicitly here (== nil instead of 'not'), because
-- netbox requests return box.NULL instead of nils.
-- Wait for all masters to connect.
vrs.wait_masters_connect(replicasets)
timeout = deadline - fiber.clock()
--
-- Ref stage: send.
--
for uuid, _ in pairs(uuid_to_args) do
local rs = replicasets[uuid]
res, err = rs:callrw('vshard.storage._call',
{'storage_ref', rid, ref_timeout}, opts_ref)
{'storage_ref', rid, timeout}, opts_ref)
if res == nil then
err_uuid = uuid
goto fail
@@ -126,8 +127,9 @@ local function multi_storage_dql(uuid_to_args, func, vtable_max_rows, opts)
-- if the user does a maintenance of some kind by creating/deleting buckets.
-- In both cases can't guarantee all the data would be covered by Map calls.
if check_bucket_count and bucket_count ~= vshard.router.bucket_count() then
err = "All refs are done but not all buckets are covered"
goto fail
err = string.format("Expected %d buckets in the cluster, got %d",
vshard.router.bucket_count(), bucket_count)
goto fail
end
-- Map stage: send.
@@ -251,11 +253,14 @@ _G.dql_on_some = function(uuid_to_args, is_readonly, waiting_timeout, vtable_max
result = helper.unwrap_execute_result(res[1][1])
else
local err, err_uuid
local opts = { map_timeout = waiting_timeout, ref_timeout = waiting_timeout }
local opts = { timeout = waiting_timeout }
result, err, err_uuid = multi_storage_dql(uuid_to_args, exec_fn, vtable_max_rows, opts)
if err ~= nil then
helper.dql_error(err, err_uuid)
end
if result == nil then
error("local execution returned nil")
end
end
return box.tuple.new{result}
@@ -269,14 +274,16 @@ _G.dql_on_all = function(required, optional, waiting_timeout, vtable_max_rows)
uuid_to_args[uuid] = { required = required, optional = optional }
end
local opts = {
map_timeout = waiting_timeout,
ref_timeout = waiting_timeout,
timeout = waiting_timeout,
check_bucket_count = true
}
local result, err, err_uuid = multi_storage_dql(uuid_to_args, exec_fn, vtable_max_rows, opts)
if err ~= nil then
helper.dql_error(err, err_uuid)
end
if result == nil then
error("local execution returned nil")
end
return box.tuple.new{result}
end
Loading