Skip to content
Snippets Groups Projects
Commit 9355933f authored by Mergen Imeev's avatar Mergen Imeev Committed by Alexander Turenko
Browse files

experimental: introduce call()

This patch introduces the call() function in the experiment.connpool
module. The function takes the name of the function to be executed,
arguments to the function to be executed, and options. Options are used
to select candidates, on one of which the provided function will be
remotely executed.

Closes #9842

@TarantoolBot document
Title: the experimental.connpool module

The experimental.connpool module contains some functions that allow you
to retrieve the names of instances from the configuration that satisfy
specified conditions, or to execute a function on one of these
instances. Additionally, this module has a function that returns the
active connection to the instance with the specified name.

Currently this module contains three functions:
1) `connect()` takes an instance name and options and returns the active
connection to the instance with the specified name. The
`connect_timeout` and `fetch_schema` options are passed directly to
`net.box.connect()`. The `wait_connected` option is also passed to
`net.box.connect()`, but here it can be `true` or `false`, numeric
values are not allowed.
2) `filter()` accepts only options as its only argument. It returns an
array of instance names that match the provided options. Currently only
two options are supported:
    a) `labels` that instances must have;
    b) `roles` that instances must have.
3) `call` takes the name of the function to execute, arguments for this
function, and options. The `labels` and `roles` options are used to
select candidates on which the function will be executed. If the
`prefer_local` option is false, `call` will try to connect to a random
candidate until a connection is established or there are no more
candidates. If the `prefer_local` option is true or nil and the instance
in which `call` is executed is one of the candidates, `call` will
attempt to connect to that instance first. Moreover, `call` has options
`timeout`, `buffer`, `on_push`, `on_push_ctx` and `is_async`, which are
passed directly to `net.box.call()`. Note that the `connection` option
works for the `net.box.call()` part. The potential maximum execution
time is `<number of candidates> * <default connect_timeout for
net.box.connect()> + connection`.
parent 196e647e
No related branches found
No related tags found
No related merge requests found
......@@ -4,3 +4,5 @@
`connect()` function of this module (gh-9842).
* Introduced the `filter()` function in the `experimental.connpool`
module (gh-9842).
* Introduced the `call()` function in the `experimental.connpool`
module (gh-9842).
......@@ -112,7 +112,80 @@ local function filter(opts)
return candidates
end
local function get_connection(all_candidates, prefer_local)
local candidates = table.copy(all_candidates)
if prefer_local ~= false then
local candidate_idx = nil
for n, candidate in ipairs(candidates) do
if candidate == box.info.name then
candidate_idx = n
local conn = connect(box.info.name, {wait_connected = false})
if conn:wait_connected() then
return conn
end
break
end
end
if candidate_idx ~= nil then
table.remove(candidates, candidate_idx)
end
end
while #candidates > 0 do
local n = math.random(#candidates)
local instance_name = table.remove(candidates, n)
local conn = connect(instance_name, {wait_connected = false})
if conn:wait_connected() then
return conn
end
end
return nil
end
local function call(func_name, args, opts)
checks('string', '?table', {
labels = '?table',
roles = '?table',
prefer_local = '?boolean',
-- The following options passed directly to net.box.call().
timeout = '?',
buffer = '?',
on_push = '?function',
on_push_ctx = '?',
is_async = '?boolean',
})
opts = opts or {}
local candidates_opts = {
labels = opts.labels,
roles = opts.roles,
}
local candidates = filter(candidates_opts)
if next(candidates) == nil then
local msg = "Couldn't execute function %s: no candidates are " ..
"available with these conditions"
error(msg:format(func_name), 0)
end
local conn = get_connection(candidates, opts.prefer_local)
if conn == nil then
local msg = "Couldn't execute function %s: connection to " ..
"candidates failed"
error(msg:format(func_name), 0)
end
local net_box_call_opts = {
timeout = opts.timeout,
buffer = opts.buffer,
on_push = opts.on_push,
on_push_ctx = opts.on_push_ctx,
is_async = opts.is_async,
}
return conn:call(func_name, args, net_box_call_opts)
end
return {
connect = connect,
filter = filter,
call = call,
}
......@@ -204,3 +204,122 @@ g.test_filter = function(g)
g.server_3:exec(check)
g.server_4:exec(check)
end
g.test_call = function(g)
local dir = treegen.prepare_directory(g, {}, {})
local config = [[
credentials:
users:
guest:
roles: [super]
iproto:
listen:
- uri: 'unix/:./{{ instance_name }}.iproto'
groups:
group-001:
replicasets:
replicaset-001:
instances:
instance-001:
roles: [one]
database:
mode: rw
labels:
l1: 'first'
l2: 'second'
instance-002: {}
replicaset-002:
instances:
instance-003:
roles: [one]
database:
mode: rw
labels:
l2: 'second'
instance-004:
roles: [one]
labels:
l1: 'first'
l3: 'second'
]]
treegen.write_script(dir, 'config.yaml', config)
local role = string.dump(function()
local function f1()
return box.info.name
end
local function f2(a, b, c)
return a + b * c
end
rawset(_G, 'f1', f1)
rawset(_G, 'f2', f2)
return {
stop = function() end,
apply = function() end,
validate = function() end,
}
end)
treegen.write_script(dir, 'one.lua', role)
local opts = {
env = {LUA_PATH = os.environ()['LUA_PATH']},
config_file = 'config.yaml',
chdir = dir,
}
g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
g.server_2 = server:new(fun.chain(opts, {alias = 'instance-002'}):tomap())
g.server_3 = server:new(fun.chain(opts, {alias = 'instance-003'}):tomap())
g.server_4 = server:new(fun.chain(opts, {alias = 'instance-004'}):tomap())
g.server_1:start({wait_until_ready = false})
g.server_2:start({wait_until_ready = false})
g.server_3:start({wait_until_ready = false})
g.server_4:start({wait_until_ready = false})
g.server_1:wait_until_ready()
g.server_2:wait_until_ready()
g.server_3:wait_until_ready()
g.server_4:wait_until_ready()
-- Make sure module pool is working.
local function check()
local connpool = require('experimental.connpool')
local opts = {
labels = {l1 = 'first'},
roles = {'one'},
}
local candidates = connpool.filter(opts)
local is_candidate = false
for _, candidate_name in ipairs(candidates) do
is_candidate = is_candidate or candidate_name == box.info.name
end
t.assert(not is_candidate or box.info.name == 'instance-001' or
box.info.name == 'instance-004')
if is_candidate then
t.assert(opts.prefer_local == nil)
t.assert_equals(connpool.call('f1', nil, opts), box.info.name)
opts.prefer_local = true
t.assert_equals(connpool.call('f1', nil, opts), box.info.name)
else
t.assert(opts.prefer_local == nil)
t.assert_items_include(candidates, {connpool.call('f1', nil, opts)})
opts.prefer_local = true
t.assert_items_include(candidates, {connpool.call('f1', nil, opts)})
end
t.assert_equals(connpool.call('f2', {1,2,3}, {roles = {'one'}}), 7)
end
g.server_1:exec(check)
g.server_2:exec(check)
g.server_3:exec(check)
g.server_4:exec(check)
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment