From 324872ab22b36e743eecc0a73cb8a7f0c162b848 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov@tarantool.org> Date: Tue, 15 Aug 2023 10:31:34 +0300 Subject: [PATCH] iproto: introduce box.session.new Closes #8801 @TarantoolBot document Title: Document `box.session.new` The new function creates a new session given a table of options: - `type`: string, optional. Default: "binary". [Type][session-type] of the new session. Currently, only "binary" is supported, which creates a new IPROTO session. - `fd`: number, mandatory. File descriptor number (fd) to be used for the new connection input-output. The fd must refer to a [socket] and be switched to the [non-blocking mode][socket-nonblock] but this isn't enforced, i.e. the user may pass an invalid fd, in which case the connection won't work as expected. - `user`: string, optional. Default: "guest". Name of the user to authenticate the new session as. Note, this doesn't prevent the other end to perform explicit [authentication]. - `storage`: table, optional. Default: empty table. Initial value of the [session-local storage][session-storage]. On success, `box.session.new` takes ownership of the `fd` and returns nothing. On failure, an error is raised. Possible errors: - Invalid option value type. - `fd` isn't specified or has an invalid value. - `box.cfg` wasn't called. - `user` doesn't exist. Example: The code below creates a TCP server that accepts all incoming IPROTO connections on port 3301, authenticates them as 'admin' and sets the session-local storage to `{foo = 'bar'}`. ```lua box.cfg() require('socket').tcp_server('localhost', 3301, function(s) box.session.new({ type = 'binary', fd = s:fd(), user = 'admin', storage = {foo = 'bar'}, }) s:detach() end) ``` Notes: - `box.cfg` must be called before using `box.session.new` to start IPROTO threads. Setting [`box.cfg.listen`][box-cfg-listen] isn't required though. - The socket object must be detached after passing its fd to `box.session.new`, otherwise the fd would be closed on Lua garbage collection. [authentication]: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/authentication/ [box-cfg-listen]: https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-listen [session-storage]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/storage/ [session-type]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/type/ [socket]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/socket/ [socket-nonblock]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/socket/#socket-nonblock --- .../gh-8801-iproto-session-from-fd.md | 4 + src/box/iproto.cc | 106 ++++++++- src/box/iproto.h | 25 ++ src/box/lua/iproto.c | 62 ++++- src/box/lua/session.lua | 25 ++ ...ort_iproto_constants_and_features_test.lua | 8 +- .../gh_8801_iproto_session_from_fd_test.lua | 220 ++++++++++++++++++ 7 files changed, 427 insertions(+), 23 deletions(-) create mode 100644 changelogs/unreleased/gh-8801-iproto-session-from-fd.md create mode 100644 test/box-luatest/gh_8801_iproto_session_from_fd_test.lua diff --git a/changelogs/unreleased/gh-8801-iproto-session-from-fd.md b/changelogs/unreleased/gh-8801-iproto-session-from-fd.md new file mode 100644 index 0000000000..b427c5a0d3 --- /dev/null +++ b/changelogs/unreleased/gh-8801-iproto-session-from-fd.md @@ -0,0 +1,4 @@ +## feature/box + +* Introduced the new function `box.session.new` for creating a new IPROTO + session from a socket file descriptor number (gh-8801). diff --git a/src/box/iproto.cc b/src/box/iproto.cc index d0ec171468..6af38911a2 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -358,6 +358,12 @@ struct iproto_msg }; /** Peer address size. */ socklen_t addrlen; + /** + * Session to use for the new connection. + * Optional. If omitted, a new session object + * will be created in the TX thread. + */ + struct session *session; } connect; /** Box request, if this is a DML */ struct request dml; @@ -2760,7 +2766,12 @@ tx_process_connect(struct cmsg *m) struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; struct obuf *out = msg->connection->tx.p_obuf; - con->session = session_new(SESSION_TYPE_BINARY); + if (msg->connect.session != NULL) { + con->session = msg->connect.session; + session_set_type(con->session, SESSION_TYPE_BINARY); + } else { + con->session = session_new(SESSION_TYPE_BINARY); + } con->session->meta.connection = con; session_set_peer_addr(con->session, &msg->connect.addr, msg->connect.addrlen); @@ -2824,18 +2835,23 @@ net_send_greeting(struct cmsg *m) /** * Create a connection and start input. + * + * If session is NULL, a new session object will be created for the connection + * in the TX thread. + * + * The function takes ownership of the passed IO stream and session. */ static void -iproto_on_accept(struct evio_service *service, struct iostream *io, - struct sockaddr *addr, socklen_t addrlen) +iproto_thread_accept(struct iproto_thread *iproto_thread, struct iostream *io, + struct sockaddr *addr, socklen_t addrlen, + struct session *session) { - struct iproto_thread *iproto_thread = - (struct iproto_thread *)service->on_accept_param; struct iproto_connection *con = iproto_connection_new(iproto_thread); struct iproto_msg *msg = iproto_msg_new(con); assert(addrlen <= sizeof(msg->connect.addrstorage)); memcpy(&msg->connect.addrstorage, addr, addrlen); msg->connect.addrlen = addrlen; + msg->connect.session = session; iostream_move(&con->io, io); cmsg_init(&msg->base, iproto_thread->connect_route); msg->p_ibuf = con->p_ibuf; @@ -2843,6 +2859,16 @@ iproto_on_accept(struct evio_service *service, struct iostream *io, cpipe_push(&iproto_thread->tx_pipe, &msg->base); } +static void +iproto_on_accept_cb(struct evio_service *service, struct iostream *io, + struct sockaddr *addr, socklen_t addrlen) +{ + struct iproto_thread *iproto_thread = + (struct iproto_thread *)service->on_accept_param; + iproto_thread_accept(iproto_thread, io, addr, addrlen, + /*session=*/NULL); +} + /** * The network io thread main function: * begin serving the message bus. @@ -2861,7 +2887,7 @@ net_cord_f(va_list ap) sizeof(struct iproto_stream)); evio_service_create(loop(), &iproto_thread->binary, "binary", - iproto_on_accept, iproto_thread); + iproto_on_accept_cb, iproto_thread); char endpoint_name[ENDPOINT_NAME_MAX]; snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u", @@ -3199,6 +3225,10 @@ enum iproto_cfg_op { * reset. */ IPROTO_CFG_OVERRIDE, + /** + * Command code to create a new IPROTO session. + */ + IPROTO_CFG_SESSION_NEW, }; /** @@ -3216,6 +3246,12 @@ struct iproto_cfg_msg: public cbus_call_msg struct iproto_stats *stats; /** New iproto max message count. */ int iproto_msg_max; + struct { + /** New connection IO stream. */ + struct iostream io; + /** New connection session. */ + struct session *session; + } session_new; struct { /** Overridden request type. */ uint32_t req_type; @@ -3296,13 +3332,29 @@ iproto_do_cfg_f(struct cbus_call_msg *m) mh_i32_del(req_handlers, k, NULL); } break; + case IPROTO_CFG_SESSION_NEW: { + struct iostream *io = &cfg_msg->session_new.io; + struct session *session = cfg_msg->session_new.session; + struct sockaddr_storage addrstorage; + struct sockaddr *addr = (struct sockaddr *)&addrstorage; + socklen_t addrlen = sizeof(addrstorage); + if (sio_getpeername(io->fd, addr, &addrlen) != 0) + addrlen = 0; + iproto_thread_accept(iproto_thread, io, addr, addrlen, session); + break; + } default: unreachable(); } return 0; } -static inline void +/** + * Sends a configuration message to an IPROTO thread and waits for completion. + * + * The message may be allocated on stack. + */ +static void iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg) { msg->iproto_thread = iproto_thread; @@ -3312,6 +3364,28 @@ iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg) (void)rc; } +static int +iproto_do_cfg_async_free_f(struct cbus_call_msg *m) +{ + free(m); + return 0; +} + +/** + * Sends a configuration message to an IPROTO thread without waiting for + * completion. + * + * The message must be allocated with malloc. + */ +static void +iproto_do_cfg_async(struct iproto_thread *iproto_thread, + struct iproto_cfg_msg *msg) +{ + msg->iproto_thread = iproto_thread; + cbus_call_async(&iproto_thread->net_pipe, &iproto_thread->tx_pipe, + msg, iproto_do_cfg_f, iproto_do_cfg_async_free_f); +} + /** Send IPROTO_CFG_STOP to all threads. */ static void iproto_send_stop_msg(void) @@ -3451,6 +3525,24 @@ iproto_set_msg_max(int new_iproto_msg_max) return 0; } +uint64_t +iproto_session_new(struct iostream *io, struct user *user) +{ + assert(iostream_is_initialized(io)); + struct session *session = session_new(SESSION_TYPE_BACKGROUND); + if (user != NULL) + credentials_reset(&session->credentials, user); + struct iproto_cfg_msg *cfg_msg = + (struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg)); + iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_SESSION_NEW); + iostream_move(&cfg_msg->session_new.io, io); + cfg_msg->session_new.session = session; + static int thread = 0; + thread = (thread + 1) % iproto_threads_count; + iproto_do_cfg_async(&iproto_threads[thread], cfg_msg); + return session->id; +} + /** * Notifies IPROTO threads that a new request handler has been set. */ diff --git a/src/box/iproto.h b/src/box/iproto.h index 9ce71927ec..56ad378721 100644 --- a/src/box/iproto.h +++ b/src/box/iproto.h @@ -32,11 +32,14 @@ */ #include <stddef.h> +#include <stdint.h> #include "box/box.h" struct uri_set; struct session; +struct user; +struct iostream; #if defined(__cplusplus) extern "C" { @@ -141,6 +144,28 @@ iproto_listen(const struct uri_set *uri_set); int iproto_set_msg_max(int iproto_msg_max); +/** + * Creates a new IPROTO session over the given IO stream and returns the new + * session id. Never fails. Doesn't yield. + * + * The IO stream must refer to a non-blocking socket but this isn't enforced by + * this function. If it isn't so, the new connection may not work as expected. + * + * If the user argument isn't NULL, the new session will be authenticated as + * the specified user. Otherwise, it will be authenticated as guest. + * + * The function takes ownership of the passed IO stream by moving it to the + * new IPROTO connection (see iostream_move). + * + * Essentially, this function passes the IO stream to the callback invoked + * by an IPROTO thread upon accepting a new connection on a listening socket. + * The callback creates a new IPROTO connection, attaches it to the given + * session, then sends the greeting message and starts processing requests as + * usual. All of this is done asynchronously by an IPROTO thread. + */ +uint64_t +iproto_session_new(struct iostream *io, struct user *user); + /** * Sends a packet with the given header and body over the IPROTO session's * socket. diff --git a/src/box/lua/iproto.c b/src/box/lua/iproto.c index 5fc19ecedd..e114ecfbd4 100644 --- a/src/box/lua/iproto.c +++ b/src/box/lua/iproto.c @@ -11,8 +11,10 @@ #include "box/iproto.h" #include "box/iproto_constants.h" #include "box/iproto_features.h" +#include "box/user.h" #include "core/assoc.h" +#include "core/iostream.h" #include "core/fiber.h" #include "core/tt_static.h" @@ -178,6 +180,45 @@ push_iproto_protocol_features(struct lua_State *L) lua_setfield(L, -2, "feature"); } +/** + * Internal Lua wrapper around iproto_session_new. + * + * Takes fd number (mandatory) and user name (optional, default is guest). + * Returns the new session id on success. On error, raises an exception. + */ +static int +lbox_iproto_session_new(struct lua_State *L) +{ + if (lua_isnoneornil(L, 1)) { + diag_set(ClientError, ER_ILLEGAL_PARAMS, + "options parameter 'fd' is mandatory"); + return luaT_error(L); + } + int fd; + if (!luaL_tointeger_strict(L, 1, &fd) || fd < 0) { + diag_set(ClientError, ER_ILLEGAL_PARAMS, + "options parameter 'fd' must be nonnegative integer"); + return luaT_error(L); + } + if (!box_is_configured()) { + diag_set(ClientError, ER_UNCONFIGURED); + return luaT_error(L); + } + struct user *user = NULL; + if (!lua_isnil(L, 2)) { + size_t name_len; + const char *name = luaL_checklstring(L, 2, &name_len); + user = user_find_by_name(name, name_len); + if (user == NULL) + return luaT_error(L); + } + struct iostream io; + plain_iostream_create(&io, fd); + uint64_t sid = iproto_session_new(&io, user); + luaL_pushuint64(L, sid); + return 1; +} + /** * Encodes a packet header/body argument to MsgPack: if the argument is a * string, then no encoding is needed — otherwise the argument must be a Lua @@ -340,22 +381,23 @@ void box_lua_iproto_init(struct lua_State *L) { iproto_key_translation = mh_strnu32_new(); - - lua_getfield(L, LUA_GLOBALSINDEX, "box"); - lua_newtable(L); - + luaL_findtable(L, LUA_GLOBALSINDEX, "box.iproto", 0); push_iproto_constants(L); push_iproto_protocol_features(L); - - const struct luaL_Reg iproto_methods[] = { + static const struct luaL_Reg funcs[] = { {"send", lbox_iproto_send}, {"override", lbox_iproto_override}, {NULL, NULL} }; - luaL_register(L, NULL, iproto_methods); - - lua_setfield(L, -2, "iproto"); - lua_pop(L, 1); + luaL_setfuncs(L, funcs, 0); + luaL_findtable(L, -1, "internal", 0); + static const struct luaL_Reg internal_funcs[] = { + {"session_new", lbox_iproto_session_new}, + {NULL, NULL} + }; + luaL_setfuncs(L, internal_funcs, 0); + lua_pop(L, 1); /* box.iproto.internal */ + lua_pop(L, 1); /* box.iproto */ } /** diff --git a/src/box/lua/session.lua b/src/box/lua/session.lua index 78c01a508c..1864bd74c2 100644 --- a/src/box/lua/session.lua +++ b/src/box/lua/session.lua @@ -1,5 +1,7 @@ -- session.lua +local utils = require('internal.utils') + local session = box.session setmetatable(session, { @@ -21,3 +23,26 @@ setmetatable(session, { aggregate_storage = {} }) + +local SESSION_NEW_OPTS = { + type = 'string', + fd = 'number', + user = 'string', + storage = 'table' +} + +session.new = function(opts) + opts = opts or {} + utils.check_param_table(opts, SESSION_NEW_OPTS) + if opts.type ~= nil and opts.type ~= 'binary' then + box.error(box.error.ILLEGAL_PARAMS, + "invalid session type '" .. opts.type .. "', " .. + "the only supported type is 'binary'") + end + local sid = box.iproto.internal.session_new(opts.fd, opts.user) + -- It's okay to set the session storage after creating the session + -- because session_new doesn't yield so no one could possibly access + -- the uninitialized storage yet. + assert(session.exists(sid)) + getmetatable(session).aggregate_storage[sid] = opts.storage +end diff --git a/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua b/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua index 35ad86f082..78fdb581d2 100644 --- a/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua +++ b/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua @@ -190,12 +190,8 @@ local reference_table = { -- Checks that IPROTO constants and features are exported correctly. g.test_iproto_constants_and_features_export = function(cg) cg.server:exec(function(reference_table) - for k, v in pairs(box.iproto) do - local v_type = type(v) - if v_type ~= 'function' and v_type ~= 'thread' and - v_type ~= 'userdata' then - t.assert_equals(v, reference_table[k]) - end + for k, v in pairs(reference_table) do + t.assert_equals(box.iproto[k], v) end end, {reference_table}) end diff --git a/test/box-luatest/gh_8801_iproto_session_from_fd_test.lua b/test/box-luatest/gh_8801_iproto_session_from_fd_test.lua new file mode 100644 index 0000000000..f29ac19c3a --- /dev/null +++ b/test/box-luatest/gh_8801_iproto_session_from_fd_test.lua @@ -0,0 +1,220 @@ +local fio = require('fio') +local net = require('net.box') +local server = require('luatest.server') +local t = require('luatest') + +local g = t.group() + +local SOCK_PATH = fio.pathjoin(server.vardir, 'gh-8801.sock') + +g.before_all(function(cg) + cg.server = server:new({box_cfg = {iproto_threads = 4}}) + cg.server:start() + + -- Start a TCP server listening on SOCK_PATH. + -- + -- The server handler will accept all incoming connections with + -- box.session.new(opts). + cg.start_listen = function(opts) + cg.server:exec(function(sock_path, opts) + local socket = require('socket') + t.assert_not(rawget(_G, 'listen_sock')) + local function handler(sock) + local opts2 = opts and table.copy(opts) or {} + opts2.fd = sock:fd() + box.session.new(opts2) + sock:detach() + end + local listen_sock = socket.tcp_server('unix/', sock_path, handler) + t.assert(listen_sock) + rawset(_G, 'listen_sock', listen_sock) + end, {SOCK_PATH, opts}) + end + + -- Stop the server started with start_listen. + cg.stop_listen = function() + cg.server:exec(function() + local listen_sock = rawget(_G, 'listen_sock') + if listen_sock then + listen_sock:close() + rawset(_G, 'listen_sock', nil) + end + end) + end +end) + +g.after_all(function(cg) + cg.server:drop() +end) + +g.after_each(function(cg) + cg.stop_listen() +end) + +-- Checks that box.cfg() must be called. +g.test_no_cfg = function() + t.assert_error_msg_equals("Please call box.cfg{} first", + box.session.new, {fd = 0}) +end + +-- Checks errors raised on invalid arguments. +g.test_invalid_args = function(cg) + cg.server:exec(function() + t.assert_error_msg_equals( + "Illegal parameters, options should be a table", + box.session.new, 'foo') + t.assert_error_msg_equals( + "Illegal parameters, unexpected option 'foo'", + box.session.new, {foo = 'bar'}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'type' should be of type string", + box.session.new, {type = 0}) + t.assert_error_msg_equals( + "Illegal parameters, invalid session type 'foo', " .. + "the only supported type is 'binary'", + box.session.new, {type = 'foo'}) + t.assert_error_msg_equals( + "Illegal parameters, options parameter 'fd' is mandatory", + box.session.new, {}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'fd' should be of type number", + box.session.new, {fd = 'foo'}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'fd' must be nonnegative integer", + box.session.new, {fd = -1}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'fd' must be nonnegative integer", + box.session.new, {fd = 2 ^ 31}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'fd' must be nonnegative integer", + box.session.new, {fd = 1.5}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'user' should be of type string", + box.session.new, {user = 0}) + t.assert_error_msg_equals( + "User 'foo' is not found", + box.session.new, {fd = 0, user = 'foo'}) + t.assert_error_msg_equals( + "Illegal parameters, " .. + "options parameter 'storage' should be of type table", + box.session.new, {storage = 'foo'}) + end) +end + +-- Checks default options. +g.test_defaults = function(cg) + cg.start_listen() + local conn = net.connect(SOCK_PATH) + t.assert(conn.state, 'active') + t.assert_equals(conn:call('box.session.type'), 'binary') + t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)') + t.assert_equals(conn:call('box.session.user'), 'guest') + t.assert_equals(conn:eval('return box.session.storage'), {}) + conn:close() +end + +-- Checks that one may specify a custom session user. +g.test_custom_user = function(cg) + cg.start_listen({user = 'admin'}) + local conn = net.connect(SOCK_PATH) + t.assert(conn.state, 'active') + t.assert_equals(conn:call('box.session.type'), 'binary') + t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)') + t.assert_equals(conn:call('box.session.user'), 'admin') + t.assert_equals(conn:eval('return box.session.storage'), {}) + conn:close() +end + +-- Checks that one may override a custom session user by passing credentials. +g.test_custom_user_override = function(cg) + cg.start_listen({user = 'admin'}) + local conn = net.connect(SOCK_PATH, {user = 'guest'}) + t.assert(conn.state, 'active') + t.assert_equals(conn:call('box.session.type'), 'binary') + t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)') + t.assert_equals(conn:call('box.session.user'), 'guest') + t.assert_equals(conn:eval('return box.session.storage'), {}) + conn:close() +end + +-- Checks that one may specify a custom session storage. +g.test_custom_storage = function(cg) + local storage = {foo = 1, bar = 2} + cg.start_listen({storage = storage}) + local conn = net.connect(SOCK_PATH) + t.assert(conn.state, 'active') + t.assert_equals(conn:call('box.session.type'), 'binary') + t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)') + t.assert_equals(conn:call('box.session.user'), 'guest') + t.assert_equals(conn:eval('return box.session.storage'), storage) + conn:close() +end + +-- Checks that one may specify the 'binary' session type explicitly. +g.test_session_type = function(cg) + cg.start_listen({type = 'binary'}) + local conn = net.connect(SOCK_PATH) + t.assert(conn.state, 'active') + t.assert_equals(conn:call('box.session.type'), 'binary') + t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)') + t.assert_equals(conn:call('box.session.user'), 'guest') + t.assert_equals(conn:eval('return box.session.storage'), {}) + conn:close() +end + +-- Checks that connections are distributed evenly among all threads. +g.test_threads = function(cg) + cg.start_listen() + local COUNT = 20 + local conns = {} + for i = 1, COUNT do + conns[i] = net.connect(SOCK_PATH) + t.assert(conns[i].state, 'active') + end + cg.server:exec(function(COUNT) + t.assert(box.cfg.iproto_threads > 1) + for i = 1, box.cfg.iproto_threads do + t.assert_ge(box.stat.net.thread[i].CONNECTIONS.current, + COUNT / box.cfg.iproto_threads) + end + end, {COUNT}) + for i = 1, COUNT do + conns[i]:close() + end +end + +g.after_test('test_invalid_fd', function(cg) + cg.server:exec(function() + for _, func in ipairs(box.session.on_connect()) do + box.session.on_connect(nil, func) + end + t.assert_equals(box.session.on_connect(), {}) + end) +end) + +-- Checks that the session created from an invalid fd is closed. +g.test_invalid_fd = function(cg) + cg.server:exec(function() + local sid, fd, peer + box.session.on_connect(function() + sid = box.session.id() + fd = box.session.fd() + peer = box.session.peer() + end) + box.session.new({fd = 9000}) + t.helpers.retrying({}, function() + t.assert_is_not(sid, nil) + end) + t.assert_equals(fd, 9000) + t.assert_is(peer, nil) + t.helpers.retrying({}, function() + t.assert_not(box.session.exists(sid)) + end) + end) +end -- GitLab