diff --git a/changelogs/unreleased/gh-9842-rpc-for-roles.md b/changelogs/unreleased/gh-9842-rpc-for-roles.md index e5539f7b64ce0f82f099c23e37ad197c0cdedf64..4f26e4b25cc822f2ab7c607a9264734fd48f35d4 100644 --- a/changelogs/unreleased/gh-9842-rpc-for-roles.md +++ b/changelogs/unreleased/gh-9842-rpc-for-roles.md @@ -4,3 +4,5 @@ `connect()` function of this module (gh-9842). * Introduced the `filter()` function in the `experimental.connpool` module (gh-9842). +* Introduced the `call()` function in the `experimental.connpool` + module (gh-9842). diff --git a/src/box/lua/experimental/connpool.lua b/src/box/lua/experimental/connpool.lua index 17907c52c234e8d85b37285d567efe1bd40d2a4c..fdc6c759163d7ad51f209a90dd574db76a704001 100644 --- a/src/box/lua/experimental/connpool.lua +++ b/src/box/lua/experimental/connpool.lua @@ -112,7 +112,80 @@ local function filter(opts) return candidates end +local function get_connection(all_candidates, prefer_local) + local candidates = table.copy(all_candidates) + if prefer_local ~= false then + local candidate_idx = nil + for n, candidate in ipairs(candidates) do + if candidate == box.info.name then + candidate_idx = n + local conn = connect(box.info.name, {wait_connected = false}) + if conn:wait_connected() then + return conn + end + break + end + end + if candidate_idx ~= nil then + table.remove(candidates, candidate_idx) + end + end + + while #candidates > 0 do + local n = math.random(#candidates) + local instance_name = table.remove(candidates, n) + local conn = connect(instance_name, {wait_connected = false}) + if conn:wait_connected() then + return conn + end + end + return nil +end + +local function call(func_name, args, opts) + checks('string', '?table', { + labels = '?table', + roles = '?table', + prefer_local = '?boolean', + -- The following options passed directly to net.box.call(). + timeout = '?', + buffer = '?', + on_push = '?function', + on_push_ctx = '?', + is_async = '?boolean', + }) + opts = opts or {} + + local candidates_opts = { + labels = opts.labels, + roles = opts.roles, + } + local candidates = filter(candidates_opts) + if next(candidates) == nil then + local msg = "Couldn't execute function %s: no candidates are " .. + "available with these conditions" + error(msg:format(func_name), 0) + end + + local conn = get_connection(candidates, opts.prefer_local) + if conn == nil then + local msg = "Couldn't execute function %s: connection to " .. + "candidates failed" + error(msg:format(func_name), 0) + end + + local net_box_call_opts = { + timeout = opts.timeout, + buffer = opts.buffer, + on_push = opts.on_push, + on_push_ctx = opts.on_push_ctx, + is_async = opts.is_async, + } + return conn:call(func_name, args, net_box_call_opts) +end + return { connect = connect, filter = filter, + call = call, } diff --git a/test/config-luatest/rpc_test.lua b/test/config-luatest/rpc_test.lua index d38d5d9caaf542393bb716fb4268ca0cbb432ecf..40bed364b9e1c12d11ccbf4af9df625b25e13974 100644 --- a/test/config-luatest/rpc_test.lua +++ b/test/config-luatest/rpc_test.lua @@ -204,3 +204,122 @@ g.test_filter = function(g) g.server_3:exec(check) g.server_4:exec(check) end + +g.test_call = function(g) + local dir = treegen.prepare_directory(g, {}, {}) + local config = [[ + credentials: + users: + guest: + roles: [super] + + iproto: + listen: + - uri: 'unix/:./{{ instance_name }}.iproto' + + groups: + group-001: + replicasets: + replicaset-001: + instances: + instance-001: + roles: [one] + database: + mode: rw + labels: + l1: 'first' + l2: 'second' + instance-002: {} + replicaset-002: + instances: + instance-003: + roles: [one] + database: + mode: rw + labels: + l2: 'second' + instance-004: + roles: [one] + labels: + l1: 'first' + l3: 'second' + ]] + treegen.write_script(dir, 'config.yaml', config) + + local role = string.dump(function() + local function f1() + return box.info.name + end + + local function f2(a, b, c) + return a + b * c + end + + rawset(_G, 'f1', f1) + rawset(_G, 'f2', f2) + + return { + stop = function() end, + apply = function() end, + validate = function() end, + } + end) + treegen.write_script(dir, 'one.lua', role) + + local opts = { + env = {LUA_PATH = os.environ()['LUA_PATH']}, + config_file = 'config.yaml', + chdir = dir, + } + g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap()) + g.server_2 = server:new(fun.chain(opts, {alias = 'instance-002'}):tomap()) + g.server_3 = server:new(fun.chain(opts, {alias = 'instance-003'}):tomap()) + g.server_4 = server:new(fun.chain(opts, {alias = 'instance-004'}):tomap()) + + g.server_1:start({wait_until_ready = false}) + g.server_2:start({wait_until_ready = false}) + g.server_3:start({wait_until_ready = false}) + g.server_4:start({wait_until_ready = false}) + + g.server_1:wait_until_ready() + g.server_2:wait_until_ready() + g.server_3:wait_until_ready() + g.server_4:wait_until_ready() + + -- Make sure module pool is working. + local function check() + local connpool = require('experimental.connpool') + local opts = { + labels = {l1 = 'first'}, + roles = {'one'}, + } + local candidates = connpool.filter(opts) + + local is_candidate = false + for _, candidate_name in ipairs(candidates) do + is_candidate = is_candidate or candidate_name == box.info.name + end + + t.assert(not is_candidate or box.info.name == 'instance-001' or + box.info.name == 'instance-004') + + if is_candidate then + t.assert(opts.prefer_local == nil) + t.assert_equals(connpool.call('f1', nil, opts), box.info.name) + opts.prefer_local = true + t.assert_equals(connpool.call('f1', nil, opts), box.info.name) + else + t.assert(opts.prefer_local == nil) + t.assert_items_include(candidates, {connpool.call('f1', nil, opts)}) + opts.prefer_local = true + t.assert_items_include(candidates, {connpool.call('f1', nil, opts)}) + end + + t.assert_equals(connpool.call('f2', {1,2,3}, {roles = {'one'}}), 7) + end + + g.server_1:exec(check) + g.server_2:exec(check) + g.server_3:exec(check) + g.server_4:exec(check) +end