diff --git a/changelogs/unreleased/gh-8801-iproto-session-from-fd.md b/changelogs/unreleased/gh-8801-iproto-session-from-fd.md
new file mode 100644
index 0000000000000000000000000000000000000000..b427c5a0d3589e3d34d1c15e5d4421348d9e7983
--- /dev/null
+++ b/changelogs/unreleased/gh-8801-iproto-session-from-fd.md
@@ -0,0 +1,4 @@
+## feature/box
+
+* Introduced the new function `box.session.new` for creating a new IPROTO
+  session from a socket file descriptor number (gh-8801).
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index d0ec171468c7913c52cb232de0096a55c641e4ae..6af38911a20b32c6f666f20a7910a3d7c2393add 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -358,6 +358,12 @@ struct iproto_msg
 			};
 			/** Peer address size. */
 			socklen_t addrlen;
+			/**
+			 * Session to use for the new connection.
+			 * Optional. If omitted, a new session object
+			 * will be created in the TX thread.
+			 */
+			struct session *session;
 		} connect;
 		/** Box request, if this is a DML */
 		struct request dml;
@@ -2760,7 +2766,12 @@ tx_process_connect(struct cmsg *m)
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = msg->connection->tx.p_obuf;
-	con->session = session_new(SESSION_TYPE_BINARY);
+	if (msg->connect.session != NULL) {
+		con->session = msg->connect.session;
+		session_set_type(con->session, SESSION_TYPE_BINARY);
+	} else {
+		con->session = session_new(SESSION_TYPE_BINARY);
+	}
 	con->session->meta.connection = con;
 	session_set_peer_addr(con->session, &msg->connect.addr,
 			      msg->connect.addrlen);
@@ -2824,18 +2835,23 @@ net_send_greeting(struct cmsg *m)
 
 /**
  * Create a connection and start input.
+ *
+ * If session is NULL, a new session object will be created for the connection
+ * in the TX thread.
+ *
+ * The function takes ownership of the passed IO stream and session.
  */
 static void
-iproto_on_accept(struct evio_service *service, struct iostream *io,
-		 struct sockaddr *addr, socklen_t addrlen)
+iproto_thread_accept(struct iproto_thread *iproto_thread, struct iostream *io,
+		     struct sockaddr *addr, socklen_t addrlen,
+		     struct session *session)
 {
-	struct iproto_thread *iproto_thread =
-		(struct iproto_thread *)service->on_accept_param;
 	struct iproto_connection *con = iproto_connection_new(iproto_thread);
 	struct iproto_msg *msg = iproto_msg_new(con);
 	assert(addrlen <= sizeof(msg->connect.addrstorage));
 	memcpy(&msg->connect.addrstorage, addr, addrlen);
 	msg->connect.addrlen = addrlen;
+	msg->connect.session = session;
 	iostream_move(&con->io, io);
 	cmsg_init(&msg->base, iproto_thread->connect_route);
 	msg->p_ibuf = con->p_ibuf;
@@ -2843,6 +2859,16 @@ iproto_on_accept(struct evio_service *service, struct iostream *io,
 	cpipe_push(&iproto_thread->tx_pipe, &msg->base);
 }
 
+static void
+iproto_on_accept_cb(struct evio_service *service, struct iostream *io,
+		    struct sockaddr *addr, socklen_t addrlen)
+{
+	struct iproto_thread *iproto_thread =
+		(struct iproto_thread *)service->on_accept_param;
+	iproto_thread_accept(iproto_thread, io, addr, addrlen,
+			     /*session=*/NULL);
+}
+
 /**
  * The network io thread main function:
  * begin serving the message bus.
@@ -2861,7 +2887,7 @@ net_cord_f(va_list  ap)
 		       sizeof(struct iproto_stream));
 
 	evio_service_create(loop(), &iproto_thread->binary, "binary",
-			    iproto_on_accept, iproto_thread);
+			    iproto_on_accept_cb, iproto_thread);
 
 	char endpoint_name[ENDPOINT_NAME_MAX];
 	snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u",
@@ -3199,6 +3225,10 @@ enum iproto_cfg_op {
 	 * reset.
 	 */
 	IPROTO_CFG_OVERRIDE,
+	/**
+	 * Command code to create a new IPROTO session.
+	 */
+	IPROTO_CFG_SESSION_NEW,
 };
 
 /**
@@ -3216,6 +3246,12 @@ struct iproto_cfg_msg: public cbus_call_msg
 		struct iproto_stats *stats;
 		/** New iproto max message count. */
 		int iproto_msg_max;
+		struct {
+			/** New connection IO stream. */
+			struct iostream io;
+			/** New connection session. */
+			struct session *session;
+		} session_new;
 		struct {
 			/** Overridden request type. */
 			uint32_t req_type;
@@ -3296,13 +3332,29 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
 			mh_i32_del(req_handlers, k, NULL);
 		}
 		break;
+	case IPROTO_CFG_SESSION_NEW: {
+		struct iostream *io = &cfg_msg->session_new.io;
+		struct session *session = cfg_msg->session_new.session;
+		struct sockaddr_storage addrstorage;
+		struct sockaddr *addr = (struct sockaddr *)&addrstorage;
+		socklen_t addrlen = sizeof(addrstorage);
+		if (sio_getpeername(io->fd, addr, &addrlen) != 0)
+			addrlen = 0;
+		iproto_thread_accept(iproto_thread, io, addr, addrlen, session);
+		break;
+	}
 	default:
 		unreachable();
 	}
 	return 0;
 }
 
-static inline void
+/**
+ * Sends a configuration message to an IPROTO thread and waits for completion.
+ *
+ * The message may be allocated on stack.
+ */
+static void
 iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg)
 {
 	msg->iproto_thread = iproto_thread;
@@ -3312,6 +3364,28 @@ iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg)
 	(void)rc;
 }
 
+static int
+iproto_do_cfg_async_free_f(struct cbus_call_msg *m)
+{
+	free(m);
+	return 0;
+}
+
+/**
+ * Sends a configuration message to an IPROTO thread without waiting for
+ * completion.
+ *
+ * The message must be allocated with malloc.
+ */
+static void
+iproto_do_cfg_async(struct iproto_thread *iproto_thread,
+		    struct iproto_cfg_msg *msg)
+{
+	msg->iproto_thread = iproto_thread;
+	cbus_call_async(&iproto_thread->net_pipe, &iproto_thread->tx_pipe,
+			msg, iproto_do_cfg_f, iproto_do_cfg_async_free_f);
+}
+
 /** Send IPROTO_CFG_STOP to all threads. */
 static void
 iproto_send_stop_msg(void)
@@ -3451,6 +3525,24 @@ iproto_set_msg_max(int new_iproto_msg_max)
 	return 0;
 }
 
+uint64_t
+iproto_session_new(struct iostream *io, struct user *user)
+{
+	assert(iostream_is_initialized(io));
+	struct session *session = session_new(SESSION_TYPE_BACKGROUND);
+	if (user != NULL)
+		credentials_reset(&session->credentials, user);
+	struct iproto_cfg_msg *cfg_msg =
+		(struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg));
+	iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_SESSION_NEW);
+	iostream_move(&cfg_msg->session_new.io, io);
+	cfg_msg->session_new.session = session;
+	static int thread = 0;
+	thread = (thread + 1) % iproto_threads_count;
+	iproto_do_cfg_async(&iproto_threads[thread], cfg_msg);
+	return session->id;
+}
+
 /**
  * Notifies IPROTO threads that a new request handler has been set.
  */
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 9ce71927ec6ef835adf1c169a8965a34d3116bf6..56ad37872177afa16ed8452df1601a8d18629456 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -32,11 +32,14 @@
  */
 
 #include <stddef.h>
+#include <stdint.h>
 
 #include "box/box.h"
 
 struct uri_set;
 struct session;
+struct user;
+struct iostream;
 
 #if defined(__cplusplus)
 extern "C" {
@@ -141,6 +144,28 @@ iproto_listen(const struct uri_set *uri_set);
 int
 iproto_set_msg_max(int iproto_msg_max);
 
+/**
+ * Creates a new IPROTO session over the given IO stream and returns the new
+ * session id. Never fails. Doesn't yield.
+ *
+ * The IO stream must refer to a non-blocking socket but this isn't enforced by
+ * this function. If it isn't so, the new connection may not work as expected.
+ *
+ * If the user argument isn't NULL, the new session will be authenticated as
+ * the specified user. Otherwise, it will be authenticated as guest.
+ *
+ * The function takes ownership of the passed IO stream by moving it to the
+ * new IPROTO connection (see iostream_move).
+ *
+ * Essentially, this function passes the IO stream to the callback invoked
+ * by an IPROTO thread upon accepting a new connection on a listening socket.
+ * The callback creates a new IPROTO connection, attaches it to the given
+ * session, then sends the greeting message and starts processing requests as
+ * usual. All of this is done asynchronously by an IPROTO thread.
+ */
+uint64_t
+iproto_session_new(struct iostream *io, struct user *user);
+
 /**
  * Sends a packet with the given header and body over the IPROTO session's
  * socket.
diff --git a/src/box/lua/iproto.c b/src/box/lua/iproto.c
index 5fc19ecedd6a2bb0e2b5bdf232af724c72de6f36..e114ecfbd4365452a7a77eef88b75e831be77103 100644
--- a/src/box/lua/iproto.c
+++ b/src/box/lua/iproto.c
@@ -11,8 +11,10 @@
 #include "box/iproto.h"
 #include "box/iproto_constants.h"
 #include "box/iproto_features.h"
+#include "box/user.h"
 
 #include "core/assoc.h"
+#include "core/iostream.h"
 #include "core/fiber.h"
 #include "core/tt_static.h"
 
@@ -178,6 +180,45 @@ push_iproto_protocol_features(struct lua_State *L)
 	lua_setfield(L, -2, "feature");
 }
 
+/**
+ * Internal Lua wrapper around iproto_session_new.
+ *
+ * Takes fd number (mandatory) and user name (optional, default is guest).
+ * Returns the new session id on success. On error, raises an exception.
+ */
+static int
+lbox_iproto_session_new(struct lua_State *L)
+{
+	if (lua_isnoneornil(L, 1)) {
+		diag_set(ClientError, ER_ILLEGAL_PARAMS,
+			 "options parameter 'fd' is mandatory");
+		return luaT_error(L);
+	}
+	int fd;
+	if (!luaL_tointeger_strict(L, 1, &fd) || fd < 0) {
+		diag_set(ClientError, ER_ILLEGAL_PARAMS,
+			 "options parameter 'fd' must be nonnegative integer");
+		return luaT_error(L);
+	}
+	if (!box_is_configured()) {
+		diag_set(ClientError, ER_UNCONFIGURED);
+		return luaT_error(L);
+	}
+	struct user *user = NULL;
+	if (!lua_isnil(L, 2)) {
+		size_t name_len;
+		const char *name = luaL_checklstring(L, 2, &name_len);
+		user = user_find_by_name(name, name_len);
+		if (user == NULL)
+			return luaT_error(L);
+	}
+	struct iostream io;
+	plain_iostream_create(&io, fd);
+	uint64_t sid = iproto_session_new(&io, user);
+	luaL_pushuint64(L, sid);
+	return 1;
+}
+
 /**
  * Encodes a packet header/body argument to MsgPack: if the argument is a
  * string, then no encoding is needed — otherwise the argument must be a Lua
@@ -340,22 +381,23 @@ void
 box_lua_iproto_init(struct lua_State *L)
 {
 	iproto_key_translation = mh_strnu32_new();
-
-	lua_getfield(L, LUA_GLOBALSINDEX, "box");
-	lua_newtable(L);
-
+	luaL_findtable(L, LUA_GLOBALSINDEX, "box.iproto", 0);
 	push_iproto_constants(L);
 	push_iproto_protocol_features(L);
-
-	const struct luaL_Reg iproto_methods[] = {
+	static const struct luaL_Reg funcs[] = {
 		{"send", lbox_iproto_send},
 		{"override", lbox_iproto_override},
 		{NULL, NULL}
 	};
-	luaL_register(L, NULL, iproto_methods);
-
-	lua_setfield(L, -2, "iproto");
-	lua_pop(L, 1);
+	luaL_setfuncs(L, funcs, 0);
+	luaL_findtable(L, -1, "internal", 0);
+	static const struct luaL_Reg internal_funcs[] = {
+		{"session_new", lbox_iproto_session_new},
+		{NULL, NULL}
+	};
+	luaL_setfuncs(L, internal_funcs, 0);
+	lua_pop(L, 1); /* box.iproto.internal */
+	lua_pop(L, 1); /* box.iproto */
 }
 
 /**
diff --git a/src/box/lua/session.lua b/src/box/lua/session.lua
index 78c01a508cbf7fa73de469e35ec7cd7ff4230227..1864bd74c2639b59f38f9f109200c9fd60207b14 100644
--- a/src/box/lua/session.lua
+++ b/src/box/lua/session.lua
@@ -1,5 +1,7 @@
 -- session.lua
 
+local utils = require('internal.utils')
+
 local session = box.session
 
 setmetatable(session, {
@@ -21,3 +23,26 @@ setmetatable(session, {
 
     aggregate_storage = {}
 })
+
+local SESSION_NEW_OPTS = {
+    type = 'string',
+    fd = 'number',
+    user = 'string',
+    storage = 'table'
+}
+
+session.new = function(opts)
+    opts = opts or {}
+    utils.check_param_table(opts, SESSION_NEW_OPTS)
+    if opts.type ~= nil and opts.type ~= 'binary' then
+        box.error(box.error.ILLEGAL_PARAMS,
+                  "invalid session type '" .. opts.type .. "', " ..
+                  "the only supported type is 'binary'")
+    end
+    local sid = box.iproto.internal.session_new(opts.fd, opts.user)
+    -- It's okay to set the session storage after creating the session
+    -- because session_new doesn't yield so no one could possibly access
+    -- the uninitialized storage yet.
+    assert(session.exists(sid))
+    getmetatable(session).aggregate_storage[sid] = opts.storage
+end
diff --git a/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua b/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
index 35ad86f082a070e7889d92972566afbd816eccfc..78fdb581d2ebe6615295472452b91a984370a3c8 100644
--- a/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
+++ b/test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
@@ -190,12 +190,8 @@ local reference_table = {
 -- Checks that IPROTO constants and features are exported correctly.
 g.test_iproto_constants_and_features_export = function(cg)
     cg.server:exec(function(reference_table)
-        for k, v in pairs(box.iproto) do
-            local v_type = type(v)
-            if v_type ~= 'function' and v_type ~= 'thread' and
-               v_type ~= 'userdata' then
-                t.assert_equals(v, reference_table[k])
-            end
+        for k, v in pairs(reference_table) do
+            t.assert_equals(box.iproto[k], v)
         end
     end, {reference_table})
 end
diff --git a/test/box-luatest/gh_8801_iproto_session_from_fd_test.lua b/test/box-luatest/gh_8801_iproto_session_from_fd_test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..f29ac19c3a669171f99aa131928f56e0411486f3
--- /dev/null
+++ b/test/box-luatest/gh_8801_iproto_session_from_fd_test.lua
@@ -0,0 +1,220 @@
+local fio = require('fio')
+local net = require('net.box')
+local server = require('luatest.server')
+local t = require('luatest')
+
+local g = t.group()
+
+local SOCK_PATH = fio.pathjoin(server.vardir, 'gh-8801.sock')
+
+g.before_all(function(cg)
+    cg.server = server:new({box_cfg = {iproto_threads = 4}})
+    cg.server:start()
+
+    -- Start a TCP server listening on SOCK_PATH.
+    --
+    -- The server handler will accept all incoming connections with
+    -- box.session.new(opts).
+    cg.start_listen = function(opts)
+        cg.server:exec(function(sock_path, opts)
+            local socket = require('socket')
+            t.assert_not(rawget(_G, 'listen_sock'))
+            local function handler(sock)
+                local opts2 = opts and table.copy(opts) or {}
+                opts2.fd = sock:fd()
+                box.session.new(opts2)
+                sock:detach()
+            end
+            local listen_sock = socket.tcp_server('unix/', sock_path, handler)
+            t.assert(listen_sock)
+            rawset(_G, 'listen_sock', listen_sock)
+        end, {SOCK_PATH, opts})
+    end
+
+    -- Stop the server started with start_listen.
+    cg.stop_listen = function()
+        cg.server:exec(function()
+            local listen_sock = rawget(_G, 'listen_sock')
+            if listen_sock then
+                listen_sock:close()
+                rawset(_G, 'listen_sock', nil)
+            end
+        end)
+    end
+end)
+
+g.after_all(function(cg)
+    cg.server:drop()
+end)
+
+g.after_each(function(cg)
+    cg.stop_listen()
+end)
+
+-- Checks that box.cfg() must be called.
+g.test_no_cfg = function()
+    t.assert_error_msg_equals("Please call box.cfg{} first",
+                              box.session.new, {fd = 0})
+end
+
+-- Checks errors raised on invalid arguments.
+g.test_invalid_args = function(cg)
+    cg.server:exec(function()
+        t.assert_error_msg_equals(
+            "Illegal parameters, options should be a table",
+            box.session.new, 'foo')
+        t.assert_error_msg_equals(
+            "Illegal parameters, unexpected option 'foo'",
+            box.session.new, {foo = 'bar'})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'type' should be of type string",
+            box.session.new, {type = 0})
+        t.assert_error_msg_equals(
+            "Illegal parameters, invalid session type 'foo', " ..
+            "the only supported type is 'binary'",
+            box.session.new, {type = 'foo'})
+        t.assert_error_msg_equals(
+            "Illegal parameters, options parameter 'fd' is mandatory",
+            box.session.new, {})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'fd' should be of type number",
+            box.session.new, {fd = 'foo'})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'fd' must be nonnegative integer",
+            box.session.new, {fd = -1})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'fd' must be nonnegative integer",
+            box.session.new, {fd = 2 ^ 31})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'fd' must be nonnegative integer",
+            box.session.new, {fd = 1.5})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'user' should be of type string",
+            box.session.new, {user = 0})
+        t.assert_error_msg_equals(
+            "User 'foo' is not found",
+            box.session.new, {fd = 0, user = 'foo'})
+        t.assert_error_msg_equals(
+            "Illegal parameters, " ..
+            "options parameter 'storage' should be of type table",
+            box.session.new, {storage = 'foo'})
+    end)
+end
+
+-- Checks default options.
+g.test_defaults = function(cg)
+    cg.start_listen()
+    local conn = net.connect(SOCK_PATH)
+    t.assert(conn.state, 'active')
+    t.assert_equals(conn:call('box.session.type'), 'binary')
+    t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)')
+    t.assert_equals(conn:call('box.session.user'), 'guest')
+    t.assert_equals(conn:eval('return box.session.storage'), {})
+    conn:close()
+end
+
+-- Checks that one may specify a custom session user.
+g.test_custom_user = function(cg)
+    cg.start_listen({user = 'admin'})
+    local conn = net.connect(SOCK_PATH)
+    t.assert(conn.state, 'active')
+    t.assert_equals(conn:call('box.session.type'), 'binary')
+    t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)')
+    t.assert_equals(conn:call('box.session.user'), 'admin')
+    t.assert_equals(conn:eval('return box.session.storage'), {})
+    conn:close()
+end
+
+-- Checks that one may override a custom session user by passing credentials.
+g.test_custom_user_override = function(cg)
+    cg.start_listen({user = 'admin'})
+    local conn = net.connect(SOCK_PATH, {user = 'guest'})
+    t.assert(conn.state, 'active')
+    t.assert_equals(conn:call('box.session.type'), 'binary')
+    t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)')
+    t.assert_equals(conn:call('box.session.user'), 'guest')
+    t.assert_equals(conn:eval('return box.session.storage'), {})
+    conn:close()
+end
+
+-- Checks that one may specify a custom session storage.
+g.test_custom_storage = function(cg)
+    local storage = {foo = 1, bar = 2}
+    cg.start_listen({storage = storage})
+    local conn = net.connect(SOCK_PATH)
+    t.assert(conn.state, 'active')
+    t.assert_equals(conn:call('box.session.type'), 'binary')
+    t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)')
+    t.assert_equals(conn:call('box.session.user'), 'guest')
+    t.assert_equals(conn:eval('return box.session.storage'), storage)
+    conn:close()
+end
+
+-- Checks that one may specify the 'binary' session type explicitly.
+g.test_session_type = function(cg)
+    cg.start_listen({type = 'binary'})
+    local conn = net.connect(SOCK_PATH)
+    t.assert(conn.state, 'active')
+    t.assert_equals(conn:call('box.session.type'), 'binary')
+    t.assert_equals(conn:call('box.session.peer'), 'unix/:(socket)')
+    t.assert_equals(conn:call('box.session.user'), 'guest')
+    t.assert_equals(conn:eval('return box.session.storage'), {})
+    conn:close()
+end
+
+-- Checks that connections are distributed evenly among all threads.
+g.test_threads = function(cg)
+    cg.start_listen()
+    local COUNT = 20
+    local conns = {}
+    for i = 1, COUNT do
+        conns[i] = net.connect(SOCK_PATH)
+        t.assert(conns[i].state, 'active')
+    end
+    cg.server:exec(function(COUNT)
+        t.assert(box.cfg.iproto_threads > 1)
+        for i = 1, box.cfg.iproto_threads do
+            t.assert_ge(box.stat.net.thread[i].CONNECTIONS.current,
+                        COUNT / box.cfg.iproto_threads)
+        end
+    end, {COUNT})
+    for i = 1, COUNT do
+        conns[i]:close()
+    end
+end
+
+g.after_test('test_invalid_fd', function(cg)
+    cg.server:exec(function()
+        for _, func in ipairs(box.session.on_connect()) do
+            box.session.on_connect(nil, func)
+        end
+        t.assert_equals(box.session.on_connect(), {})
+    end)
+end)
+
+-- Checks that the session created from an invalid fd is closed.
+g.test_invalid_fd = function(cg)
+    cg.server:exec(function()
+        local sid, fd, peer
+        box.session.on_connect(function()
+            sid = box.session.id()
+            fd = box.session.fd()
+            peer = box.session.peer()
+        end)
+        box.session.new({fd = 9000})
+        t.helpers.retrying({}, function()
+            t.assert_is_not(sid, nil)
+        end)
+        t.assert_equals(fd, 9000)
+        t.assert_is(peer, nil)
+        t.helpers.retrying({}, function()
+            t.assert_not(box.session.exists(sid))
+        end)
+    end)
+end