From 9355933f0de0bc4385fe4fdfb24cdece1c3fa232 Mon Sep 17 00:00:00 2001
From: Mergen Imeev <imeevma@tarantool.org>
Date: Fri, 29 Mar 2024 16:46:47 +0300
Subject: [PATCH] experimental: introduce call()

This patch introduces the call() function in the experiment.connpool
module. The function takes the name of the function to be executed,
arguments to the function to be executed, and options. Options are used
to select candidates, on one of which the provided function will be
remotely executed.

Closes #9842

@TarantoolBot document
Title: the experimental.connpool module

The experimental.connpool module contains some functions that allow you
to retrieve the names of instances from the configuration that satisfy
specified conditions, or to execute a function on one of these
instances. Additionally, this module has a function that returns the
active connection to the instance with the specified name.

Currently this module contains three functions:
1) `connect()` takes an instance name and options and returns the active
connection to the instance with the specified name. The
`connect_timeout` and `fetch_schema` options are passed directly to
`net.box.connect()`. The `wait_connected` option is also passed to
`net.box.connect()`, but here it can be `true` or `false`, numeric
values are not allowed.
2) `filter()` accepts only options as its only argument. It returns an
array of instance names that match the provided options. Currently only
two options are supported:
    a) `labels` that instances must have;
    b) `roles` that instances must have.
3) `call` takes the name of the function to execute, arguments for this
function, and options. The `labels` and `roles` options are used to
select candidates on which the function will be executed. If the
`prefer_local` option is false, `call` will try to connect to a random
candidate until a connection is established or there are no more
candidates. If the `prefer_local` option is true or nil and the instance
in which `call` is executed is one of the candidates, `call` will
attempt to connect to that instance first. Moreover, `call` has options
`timeout`, `buffer`, `on_push`, `on_push_ctx` and `is_async`, which are
passed directly to `net.box.call()`. Note that the `connection` option
works for the `net.box.call()` part. The potential maximum execution
time is `<number of candidates> * <default connect_timeout for
net.box.connect()> + connection`.
---
 .../unreleased/gh-9842-rpc-for-roles.md       |   2 +
 src/box/lua/experimental/connpool.lua         |  73 +++++++++++
 test/config-luatest/rpc_test.lua              | 119 ++++++++++++++++++
 3 files changed, 194 insertions(+)

diff --git a/changelogs/unreleased/gh-9842-rpc-for-roles.md b/changelogs/unreleased/gh-9842-rpc-for-roles.md
index e5539f7b64..4f26e4b25c 100644
--- a/changelogs/unreleased/gh-9842-rpc-for-roles.md
+++ b/changelogs/unreleased/gh-9842-rpc-for-roles.md
@@ -4,3 +4,5 @@
   `connect()` function of this module (gh-9842).
 * Introduced the `filter()` function in the `experimental.connpool`
   module (gh-9842).
+* Introduced the `call()` function in the `experimental.connpool`
+  module (gh-9842).
diff --git a/src/box/lua/experimental/connpool.lua b/src/box/lua/experimental/connpool.lua
index 17907c52c2..fdc6c75916 100644
--- a/src/box/lua/experimental/connpool.lua
+++ b/src/box/lua/experimental/connpool.lua
@@ -112,7 +112,80 @@ local function filter(opts)
     return candidates
 end
 
+local function get_connection(all_candidates, prefer_local)
+    local candidates = table.copy(all_candidates)
+    if prefer_local ~= false then
+        local candidate_idx = nil
+        for n, candidate in ipairs(candidates) do
+            if candidate == box.info.name then
+                candidate_idx = n
+                local conn = connect(box.info.name, {wait_connected = false})
+                if conn:wait_connected() then
+                    return conn
+                end
+                break
+            end
+        end
+        if candidate_idx ~= nil then
+            table.remove(candidates, candidate_idx)
+        end
+    end
+
+    while #candidates > 0 do
+        local n = math.random(#candidates)
+        local instance_name = table.remove(candidates, n)
+        local conn = connect(instance_name, {wait_connected = false})
+        if conn:wait_connected() then
+            return conn
+        end
+    end
+    return nil
+end
+
+local function call(func_name, args, opts)
+    checks('string', '?table', {
+        labels = '?table',
+        roles = '?table',
+        prefer_local = '?boolean',
+        -- The following options passed directly to net.box.call().
+        timeout = '?',
+        buffer = '?',
+        on_push = '?function',
+        on_push_ctx = '?',
+        is_async = '?boolean',
+    })
+    opts = opts or {}
+
+    local candidates_opts = {
+        labels = opts.labels,
+        roles = opts.roles,
+    }
+    local candidates = filter(candidates_opts)
+    if next(candidates) == nil then
+        local msg = "Couldn't execute function %s: no candidates are " ..
+                    "available with these conditions"
+        error(msg:format(func_name), 0)
+    end
+
+    local conn = get_connection(candidates, opts.prefer_local)
+    if conn == nil then
+        local msg = "Couldn't execute function %s: connection to " ..
+                    "candidates failed"
+        error(msg:format(func_name), 0)
+    end
+
+    local net_box_call_opts = {
+        timeout = opts.timeout,
+        buffer = opts.buffer,
+        on_push = opts.on_push,
+        on_push_ctx = opts.on_push_ctx,
+        is_async = opts.is_async,
+    }
+    return conn:call(func_name, args, net_box_call_opts)
+end
+
 return {
     connect = connect,
     filter = filter,
+    call = call,
 }
diff --git a/test/config-luatest/rpc_test.lua b/test/config-luatest/rpc_test.lua
index d38d5d9caa..40bed364b9 100644
--- a/test/config-luatest/rpc_test.lua
+++ b/test/config-luatest/rpc_test.lua
@@ -204,3 +204,122 @@ g.test_filter = function(g)
     g.server_3:exec(check)
     g.server_4:exec(check)
 end
+
+g.test_call = function(g)
+    local dir = treegen.prepare_directory(g, {}, {})
+    local config = [[
+    credentials:
+      users:
+        guest:
+          roles: [super]
+
+    iproto:
+      listen:
+        - uri: 'unix/:./{{ instance_name }}.iproto'
+
+    groups:
+      group-001:
+        replicasets:
+          replicaset-001:
+            instances:
+              instance-001:
+                roles: [one]
+                database:
+                  mode: rw
+                labels:
+                  l1: 'first'
+                  l2: 'second'
+              instance-002: {}
+          replicaset-002:
+            instances:
+              instance-003:
+                roles: [one]
+                database:
+                  mode: rw
+                labels:
+                  l2: 'second'
+              instance-004:
+                roles: [one]
+                labels:
+                  l1: 'first'
+                  l3: 'second'
+    ]]
+    treegen.write_script(dir, 'config.yaml', config)
+
+    local role = string.dump(function()
+        local function f1()
+            return box.info.name
+        end
+
+        local function f2(a, b, c)
+            return a + b * c
+        end
+
+        rawset(_G, 'f1', f1)
+        rawset(_G, 'f2', f2)
+
+        return {
+            stop = function() end,
+            apply = function() end,
+            validate = function() end,
+        }
+    end)
+    treegen.write_script(dir, 'one.lua', role)
+
+    local opts = {
+        env = {LUA_PATH = os.environ()['LUA_PATH']},
+        config_file = 'config.yaml',
+        chdir = dir,
+    }
+    g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
+    g.server_2 = server:new(fun.chain(opts, {alias = 'instance-002'}):tomap())
+    g.server_3 = server:new(fun.chain(opts, {alias = 'instance-003'}):tomap())
+    g.server_4 = server:new(fun.chain(opts, {alias = 'instance-004'}):tomap())
+
+    g.server_1:start({wait_until_ready = false})
+    g.server_2:start({wait_until_ready = false})
+    g.server_3:start({wait_until_ready = false})
+    g.server_4:start({wait_until_ready = false})
+
+    g.server_1:wait_until_ready()
+    g.server_2:wait_until_ready()
+    g.server_3:wait_until_ready()
+    g.server_4:wait_until_ready()
+
+    -- Make sure module pool is working.
+    local function check()
+        local connpool = require('experimental.connpool')
+        local opts = {
+            labels = {l1 = 'first'},
+            roles = {'one'},
+        }
+        local candidates = connpool.filter(opts)
+
+        local is_candidate = false
+        for _, candidate_name in ipairs(candidates) do
+            is_candidate = is_candidate or candidate_name == box.info.name
+        end
+
+        t.assert(not is_candidate or box.info.name == 'instance-001' or
+                 box.info.name == 'instance-004')
+
+        if is_candidate then
+            t.assert(opts.prefer_local == nil)
+            t.assert_equals(connpool.call('f1', nil, opts), box.info.name)
+            opts.prefer_local = true
+            t.assert_equals(connpool.call('f1', nil, opts), box.info.name)
+        else
+            t.assert(opts.prefer_local == nil)
+            t.assert_items_include(candidates, {connpool.call('f1', nil, opts)})
+            opts.prefer_local = true
+            t.assert_items_include(candidates, {connpool.call('f1', nil, opts)})
+        end
+
+        t.assert_equals(connpool.call('f2', {1,2,3}, {roles = {'one'}}), 7)
+    end
+
+    g.server_1:exec(check)
+    g.server_2:exec(check)
+    g.server_3:exec(check)
+    g.server_4:exec(check)
+end
-- 
GitLab