From 2ede3be3564f04ba2c80ed454a2ba6b0ca11450d Mon Sep 17 00:00:00 2001
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
Date: Tue, 13 Apr 2021 22:18:55 +0300
Subject: [PATCH] iproto: implement ability to run multiple iproto threads

There are users that have specific workloads where iproto thread
is the bottleneck of throughput: iproto thread's code is 100% loaded
while TX thread's core is not. For such cases it would be nice to have
a capability to create several iproto threads.

Closes #5645

@TarantoolBot document
Title: implement ability to run multiple iproto threads
Implement ability to run multiple iproto threads, which is useful
in some specific workloads where iproto thread is the bottleneck
of throughput. To specify count of iproto threads, user should used
iproto_threads option in box.cfg. For example if user want to start
8 iproto threads, he must enter `box.cfg{iproto_threads=8}`. Default
iproto threads count == 1. This option is not dynamic, so user can't
change it after first setting, until server restart. Distribution of
connections per threads is managed by OS kernel.
---
 .luacheckrc                                   |   6 +
 .../unreleased/several-iproto-threads.md      |   5 +
 src/box/box.cc                                |   2 +-
 src/box/iproto.cc                             | 729 ++++++++++++------
 src/box/iproto.h                              |   5 +-
 src/box/lua/load_cfg.lua                      |   2 +
 src/box/lua/stat.c                            |   6 +-
 src/lib/core/errinj.h                         |   1 +
 test/app-tap/init_script.result               |   1 +
 test/box/admin.result                         |   2 +
 test/box/cfg.result                           |   4 +
 test/box/errinj.result                        |   1 +
 test/box/gh-5645-several-iproto-threads.lua   |  17 +
 .../box/gh-5645-several-iproto-threads.result | 162 ++++
 .../gh-5645-several-iproto-threads.test.lua   |  96 +++
 test/box/suite.ini                            |   2 +-
 16 files changed, 795 insertions(+), 246 deletions(-)
 create mode 100644 changelogs/unreleased/several-iproto-threads.md
 create mode 100755 test/box/gh-5645-several-iproto-threads.lua
 create mode 100644 test/box/gh-5645-several-iproto-threads.result
 create mode 100755 test/box/gh-5645-several-iproto-threads.test.lua

diff --git a/.luacheckrc b/.luacheckrc
index 703f86dc58..80405eb217 100644
--- a/.luacheckrc
+++ b/.luacheckrc
@@ -76,6 +76,12 @@ files["test/box/box.lua"] = {
         "iproto_request",
     }
 }
+files["test/box/gh-5645-several-iproto-threads.lua"] = {
+    globals = {
+        "errinj_set",
+        "ping",
+    },
+}
 files["test/box-tap/session.test.lua"] = {
     globals = {
         "session",
diff --git a/changelogs/unreleased/several-iproto-threads.md b/changelogs/unreleased/several-iproto-threads.md
new file mode 100644
index 0000000000..84aa1a285f
--- /dev/null
+++ b/changelogs/unreleased/several-iproto-threads.md
@@ -0,0 +1,5 @@
+## feature/core
+
+* Implement ability to run multiple iproto threads, which is useful
+  in some specific workloads where iproto thread is the bottleneck
+  of throughput (gh-5645).
\ No newline at end of file
diff --git a/src/box/box.cc b/src/box/box.cc
index b846ba8f5d..c60430a93c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -3229,7 +3229,7 @@ box_cfg_xc(void)
 	schema_init();
 	replication_init();
 	port_init();
-	iproto_init();
+	iproto_init(cfg_geti("iproto_threads"));
 	sql_init();
 
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index aabb1f33ae..4bb49d3de3 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -69,6 +69,10 @@ enum {
 	IPROTO_PACKET_SIZE_MAX = 2UL * 1024 * 1024 * 1024,
 };
 
+enum {
+	 ENDPOINT_NAME_MAX = 10
+};
+
 /**
  * A position in connection output buffer.
  * Since we use rotating buffers to recycle memory,
@@ -87,6 +91,71 @@ iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
 	wpos->svp = obuf_create_svp(out);
 }
 
+struct iproto_thread {
+	/**
+	 * Slab cache used for allocating memory for output network buffers
+	 * in the tx thread.
+	 */
+	struct slab_cache net_slabc;
+	/**
+	 * Network thread execution unit.
+	 */
+	struct cord net_cord;
+	/**
+	 * A single global queue for all requests in all connections. All
+	 * requests from all connections are processed concurrently.
+	 * Is also used as a queue for just established connections and to
+	 * execute disconnect triggers. A few notes about these triggers:
+	 * - they need to be run in a fiber
+	 * - unlike an ordinary request failure, on_connect trigger
+	 *   failure must lead to connection close.
+	 * - on_connect trigger must be processed before any other
+	 *   request on this connection.
+	 */
+	struct cpipe tx_pipe;
+	struct cpipe net_pipe;
+	/**
+	 * Static routes for this iproto thread
+	 */
+	struct cmsg_hop destroy_route[2];
+	struct cmsg_hop disconnect_route[2];
+	struct cmsg_hop misc_route[2];
+	struct cmsg_hop call_route[2];
+	struct cmsg_hop select_route[2];
+	struct cmsg_hop process1_route[2];
+	struct cmsg_hop sql_route[2];
+	struct cmsg_hop join_route[2];
+	struct cmsg_hop subscribe_route[2];
+	struct cmsg_hop error_route[2];
+	struct cmsg_hop push_route[2];
+	struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX];
+	struct cmsg_hop connect_route[2];
+	/*
+	 * Iproto thread memory pools
+	 */
+	struct mempool iproto_msg_pool;
+	struct mempool iproto_connection_pool;
+	/*
+	 * List of stopped connections
+	 */
+	RLIST_HEAD(stopped_connections);
+	/*
+	 * Iproto thread stat
+	 */
+	struct rmean *rmean;
+	/*
+	 * Iproto thread id
+	 */
+	uint32_t id;
+	/*
+	 * Iproto binary listener
+	 */
+	struct evio_service binary;
+};
+
+static struct iproto_thread *iproto_threads;
+static int iproto_threads_count;
+
 /**
  * In Greek mythology, Kharon is the ferryman who carries souls
  * of the newly deceased across the river Styx that divided the
@@ -236,8 +305,6 @@ struct iproto_msg
 	bool close_connection;
 };
 
-static struct mempool iproto_msg_pool;
-
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con);
 
@@ -245,46 +312,12 @@ iproto_msg_new(struct iproto_connection *con);
  * Resume stopped connections, if any.
  */
 static void
-iproto_resume(void);
+iproto_resume(struct iproto_thread *iproto_thread);
 
 static void
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input);
 
-static inline void
-iproto_msg_delete(struct iproto_msg *msg)
-{
-	mempool_free(&iproto_msg_pool, msg);
-	iproto_resume();
-}
-
-/**
- * A single global queue for all requests in all connections. All
- * requests from all connections are processed concurrently.
- * Is also used as a queue for just established connections and to
- * execute disconnect triggers. A few notes about these triggers:
- * - they need to be run in a fiber
- * - unlike an ordinary request failure, on_connect trigger
- *   failure must lead to connection close.
- * - on_connect trigger must be processed before any other
- *   request on this connection.
- */
-static struct cpipe tx_pipe;
-static struct cpipe net_pipe;
-
-/**
- * Network thread.
- */
-static struct cord net_cord;
-
-/**
- * Slab cache used for allocating memory for output network buffers
- * in the tx thread.
- */
-static struct slab_cache net_slabc;
-
-struct rmean *rmean_net;
-
 enum rmean_net_name {
 	IPROTO_SENT,
 	IPROTO_RECEIVED,
@@ -306,11 +339,6 @@ tx_process_destroy(struct cmsg *m);
 static void
 net_finish_destroy(struct cmsg *m);
 
-static const struct cmsg_hop destroy_route[] = {
-	{ tx_process_destroy, &net_pipe },
-	{ net_finish_destroy, NULL },
-};
-
 /** Fire on_disconnect triggers in the tx thread. */
 static void
 tx_process_disconnect(struct cmsg *m);
@@ -319,11 +347,6 @@ tx_process_disconnect(struct cmsg *m);
 static void
 net_finish_disconnect(struct cmsg *m);
 
-static const struct cmsg_hop disconnect_route[] = {
-	{ tx_process_disconnect, &net_pipe },
-	{ net_finish_disconnect, NULL }
-};
-
 /**
  * Kharon is in the dead world (iproto). Schedule an event to
  * flush new obuf as reflected in the fresh wpos.
@@ -341,12 +364,6 @@ iproto_process_push(struct cmsg *m);
 static void
 tx_end_push(struct cmsg *m);
 
-static const struct cmsg_hop push_route[] = {
-	{ iproto_process_push, &tx_pipe },
-	{ tx_end_push, NULL }
-};
-
-
 /* }}} */
 
 /* {{{ iproto_connection - declaration and definition */
@@ -536,29 +553,37 @@ struct iproto_connection
 	} tx;
 	/** Authentication salt. */
 	char salt[IPROTO_SALT_SIZE];
+	/** Iproto connection thread */
+	struct iproto_thread *iproto_thread;
 };
 
-static struct mempool iproto_connection_pool;
-static RLIST_HEAD(stopped_connections);
-
 /**
  * Return true if we have not enough spare messages
  * in the message pool.
  */
 static inline bool
-iproto_check_msg_max(void)
+iproto_check_msg_max(struct iproto_thread *iproto_thread)
 {
-	size_t request_count = mempool_count(&iproto_msg_pool);
+	size_t request_count = mempool_count(&iproto_thread->iproto_msg_pool);
 	return request_count > (size_t) iproto_msg_max;
 }
 
+static inline void
+iproto_msg_delete(struct iproto_msg *msg)
+{
+	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
+	mempool_free(&msg->connection->iproto_thread->iproto_msg_pool, msg);
+	iproto_resume(iproto_thread);
+}
+
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con)
 {
+	struct mempool *iproto_msg_pool = &con->iproto_thread->iproto_msg_pool;
 	struct iproto_msg *msg =
-		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+		(struct iproto_msg *) mempool_alloc(iproto_msg_pool);
 	ERROR_INJECT(ERRINJ_TESTING, {
-		mempool_free(&iproto_msg_pool, msg);
+		mempool_free(&con->iproto_thread->iproto_msg_pool, msg);
 		msg = NULL;
 	});
 	if (msg == NULL) {
@@ -569,7 +594,7 @@ iproto_msg_new(struct iproto_connection *con)
 	}
 	msg->close_connection = false;
 	msg->connection = con;
-	rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
+	rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
 	return msg;
 }
 
@@ -624,7 +649,8 @@ iproto_connection_stop_msg_max_limit(struct iproto_connection *con)
 	 * Important to add to tail and fetch from head to ensure
 	 * strict lifo order (fairness) for stopped connections.
 	 */
-	rlist_add_tail(&stopped_connections, &con->in_stop_list);
+	rlist_add_tail(&con->iproto_thread->stopped_connections,
+		       &con->in_stop_list);
 }
 
 /**
@@ -653,7 +679,7 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
 	 * other parts of the connection.
 	 */
 	con->state = IPROTO_CONNECTION_DESTROYED;
-	cpipe_push(&tx_pipe, &con->destroy_msg);
+	cpipe_push(&con->iproto_thread->tx_pipe, &con->destroy_msg);
 }
 
 /**
@@ -680,7 +706,7 @@ iproto_connection_close(struct iproto_connection *con)
 		 * is done only once.
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
-		cpipe_push(&tx_pipe, &con->disconnect_msg);
+		cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
 		assert(con->state == IPROTO_CONNECTION_ALIVE);
 		con->state = IPROTO_CONNECTION_CLOSED;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
@@ -814,9 +840,9 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 	bool stop_input = false;
 	const char *errmsg;
 	while (con->parse_size != 0 && !stop_input) {
-		if (iproto_check_msg_max()) {
+		if (iproto_check_msg_max(con->iproto_thread)) {
 			iproto_connection_stop_msg_max_limit(con);
-			cpipe_flush_input(&tx_pipe);
+			cpipe_flush_input(&con->iproto_thread->tx_pipe);
 			return 0;
 		}
 		const char *reqstart = in->wpos - con->parse_size;
@@ -825,7 +851,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		if (mp_typeof(*pos) != MP_UINT) {
 			errmsg = "packet length";
 err_msgpack:
-			cpipe_flush_input(&tx_pipe);
+			cpipe_flush_input(&con->iproto_thread->tx_pipe);
 			diag_set(ClientError, ER_INVALID_MSGPACK,
 				 errmsg);
 			return -1;
@@ -861,7 +887,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		 * This can't throw, but should not be
 		 * done in case of exception.
 		 */
-		cpipe_push_input(&tx_pipe, &msg->base);
+		cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
 		n_requests++;
 		/* Request is parsed */
 		assert(reqend > reqstart);
@@ -899,7 +925,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		 */
 		ev_feed_event(con->loop, &con->input, EV_READ);
 	}
-	cpipe_flush_input(&tx_pipe);
+	cpipe_flush_input(&con->iproto_thread->tx_pipe);
 	return 0;
 }
 
@@ -911,7 +937,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 static void
 iproto_connection_resume(struct iproto_connection *con)
 {
-	assert(! iproto_check_msg_max());
+	assert(! iproto_check_msg_max(con->iproto_thread));
 	rlist_del(&con->in_stop_list);
 	/*
 	 * Enqueue_batch() stops the connection again, if the
@@ -935,15 +961,16 @@ iproto_connection_resume(struct iproto_connection *con)
  * necessary to use up the limit.
  */
 static void
-iproto_resume(void)
+iproto_resume(struct iproto_thread *iproto_thread)
 {
-	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+	while (!iproto_check_msg_max(iproto_thread) &&
+	       !rlist_empty(&iproto_thread->stopped_connections)) {
 		/*
 		 * Shift from list head to ensure strict FIFO
 		 * (fairness) for resumed connections.
 		 */
 		struct iproto_connection *con =
-			rlist_first_entry(&stopped_connections,
+			rlist_first_entry(&iproto_thread->stopped_connections,
 					  struct iproto_connection,
 					  in_stop_list);
 		iproto_connection_resume(con);
@@ -965,7 +992,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 	 * otherwise we might deplete the fiber pool in tx
 	 * thread and deadlock.
 	 */
-	if (iproto_check_msg_max()) {
+	if (iproto_check_msg_max(con->iproto_thread)) {
 		iproto_connection_stop_msg_max_limit(con);
 		return;
 	}
@@ -990,7 +1017,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 			return;
 		}
 		/* Count statistics */
-		rmean_collect(rmean_net, IPROTO_RECEIVED, nrd);
+		rmean_collect(con->iproto_thread->rmean,
+			      IPROTO_RECEIVED, nrd);
 
 		/* Update the read position and connection state. */
 		in->wpos += nrd;
@@ -1049,7 +1077,7 @@ iproto_flush(struct iproto_connection *con)
 
 	if (nwr > 0) {
 		/* Count statistics */
-		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+		rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr);
 		if (begin->used + nwr == end->used) {
 			*begin = *end;
 			return 0;
@@ -1094,22 +1122,25 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 }
 
 static struct iproto_connection *
-iproto_connection_new(int fd)
+iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
 {
 	struct iproto_connection *con = (struct iproto_connection *)
-		mempool_alloc(&iproto_connection_pool);
+		mempool_alloc(&iproto_thread->iproto_connection_pool);
 	if (con == NULL) {
 		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
 		return NULL;
 	}
+	con->iproto_thread = iproto_thread;
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
 	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
-	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
-	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+	obuf_create(&con->obuf[0], &con->iproto_thread->net_slabc,
+		    iproto_readahead);
+	obuf_create(&con->obuf[1], &con->iproto_thread->net_slabc,
+		    iproto_readahead);
 	con->p_ibuf = &con->ibuf[0];
 	con->tx.p_obuf = &con->obuf[0];
 	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
@@ -1119,12 +1150,12 @@ iproto_connection_new(int fd)
 	con->session = NULL;
 	rlist_create(&con->in_stop_list);
 	/* It may be very awkward to allocate at close. */
-	cmsg_init(&con->destroy_msg, destroy_route);
-	cmsg_init(&con->disconnect_msg, disconnect_route);
+	cmsg_init(&con->destroy_msg, con->iproto_thread->destroy_route);
+	cmsg_init(&con->disconnect_msg, con->iproto_thread->disconnect_route);
 	con->state = IPROTO_CONNECTION_ALIVE;
 	con->tx.is_push_pending = false;
 	con->tx.is_push_sent = false;
-	rmean_collect(rmean_net, IPROTO_CONNECTIONS, 1);
+	rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1);
 	return con;
 }
 
@@ -1147,7 +1178,7 @@ iproto_connection_delete(struct iproto_connection *con)
 	       con->obuf[0].iov[0].iov_base == NULL);
 	assert(con->obuf[1].pos == 0 &&
 	       con->obuf[1].iov[0].iov_base == NULL);
-	mempool_free(&iproto_connection_pool, con);
+	mempool_free(&con->iproto_thread->iproto_connection_pool, con);
 }
 
 /* }}} iproto_connection */
@@ -1190,68 +1221,12 @@ net_end_join(struct cmsg *msg);
 static void
 net_end_subscribe(struct cmsg *msg);
 
-static const struct cmsg_hop misc_route[] = {
-	{ tx_process_misc, &net_pipe },
-	{ net_send_msg, NULL },
-};
-
-static const struct cmsg_hop call_route[] = {
-	{ tx_process_call, &net_pipe },
-	{ net_send_msg, NULL },
-};
-
-static const struct cmsg_hop select_route[] = {
-	{ tx_process_select, &net_pipe },
-	{ net_send_msg, NULL },
-};
-
-static const struct cmsg_hop process1_route[] = {
-	{ tx_process1, &net_pipe },
-	{ net_send_msg, NULL },
-};
-
-static const struct cmsg_hop sql_route[] = {
-	{ tx_process_sql, &net_pipe },
-	{ net_send_msg, NULL },
-};
-
-static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = {
-	NULL,                                   /* IPROTO_OK */
-	select_route,                           /* IPROTO_SELECT */
-	process1_route,                         /* IPROTO_INSERT */
-	process1_route,                         /* IPROTO_REPLACE */
-	process1_route,                         /* IPROTO_UPDATE */
-	process1_route,                         /* IPROTO_DELETE */
-	call_route,                             /* IPROTO_CALL_16 */
-	misc_route,                             /* IPROTO_AUTH */
-	call_route,                             /* IPROTO_EVAL */
-	process1_route,                         /* IPROTO_UPSERT */
-	call_route,                             /* IPROTO_CALL */
-	sql_route,                              /* IPROTO_EXECUTE */
-	NULL,                                   /* IPROTO_NOP */
-	sql_route,                              /* IPROTO_PREPARE */
-};
-
-static const struct cmsg_hop join_route[] = {
-	{ tx_process_replication, &net_pipe },
-	{ net_end_join, NULL },
-};
-
-static const struct cmsg_hop subscribe_route[] = {
-	{ tx_process_replication, &net_pipe },
-	{ net_end_subscribe, NULL },
-};
-
-static const struct cmsg_hop error_route[] = {
-	{ tx_reply_iproto_error, &net_pipe },
-	{ net_send_error, NULL },
-};
-
 static void
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input)
 {
 	uint8_t type;
+	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 
 	if (xrow_header_decode(&msg->header, pos, reqend, true))
 		goto error;
@@ -1274,43 +1249,44 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		if (xrow_decode_dml(&msg->header, &msg->dml,
 				    dml_request_key_map(type)))
 			goto error;
-		assert(type < sizeof(dml_route)/sizeof(*dml_route));
-		cmsg_init(&msg->base, dml_route[type]);
+		assert(type < sizeof(iproto_thread->dml_route) /
+		              sizeof(*(iproto_thread->dml_route)));
+		cmsg_init(&msg->base, iproto_thread->dml_route[type]);
 		break;
 	case IPROTO_CALL_16:
 	case IPROTO_CALL:
 	case IPROTO_EVAL:
 		if (xrow_decode_call(&msg->header, &msg->call))
 			goto error;
-		cmsg_init(&msg->base, call_route);
+		cmsg_init(&msg->base, iproto_thread->call_route);
 		break;
 	case IPROTO_EXECUTE:
 	case IPROTO_PREPARE:
 		if (xrow_decode_sql(&msg->header, &msg->sql) != 0)
 			goto error;
-		cmsg_init(&msg->base, sql_route);
+		cmsg_init(&msg->base, iproto_thread->sql_route);
 		break;
 	case IPROTO_PING:
-		cmsg_init(&msg->base, misc_route);
+		cmsg_init(&msg->base, iproto_thread->misc_route);
 		break;
 	case IPROTO_JOIN:
 	case IPROTO_FETCH_SNAPSHOT:
 	case IPROTO_REGISTER:
-		cmsg_init(&msg->base, join_route);
+		cmsg_init(&msg->base, iproto_thread->join_route);
 		*stop_input = true;
 		break;
 	case IPROTO_SUBSCRIBE:
-		cmsg_init(&msg->base, subscribe_route);
+		cmsg_init(&msg->base, iproto_thread->subscribe_route);
 		*stop_input = true;
 		break;
 	case IPROTO_VOTE_DEPRECATED:
 	case IPROTO_VOTE:
-		cmsg_init(&msg->base, misc_route);
+		cmsg_init(&msg->base, iproto_thread->misc_route);
 		break;
 	case IPROTO_AUTH:
 		if (xrow_decode_auth(&msg->header, &msg->auth))
 			goto error;
-		cmsg_init(&msg->base, misc_route);
+		cmsg_init(&msg->base, iproto_thread->misc_route);
 		break;
 	default:
 		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
@@ -1323,7 +1299,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	diag_log();
 	diag_create(&msg->diag);
 	diag_move(&fiber()->diag, &msg->diag);
-	cmsg_init(&msg->base, error_route);
+	cmsg_init(&msg->base, iproto_thread->error_route);
 }
 
 static void
@@ -1436,11 +1412,12 @@ net_discard_input(struct cmsg *m)
 static void
 tx_discard_input(struct iproto_msg *msg)
 {
+	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 	static const struct cmsg_hop discard_input_route[] = {
 		{ net_discard_input, NULL },
 	};
 	cmsg_init(&msg->discard_input, discard_input_route);
-	cpipe_push(&net_pipe, &msg->discard_input);
+	cpipe_push(&iproto_thread->net_pipe, &msg->discard_input);
 }
 
 /**
@@ -1990,7 +1967,8 @@ net_send_greeting(struct cmsg *m)
 
 		if (nwr > 0) {
 			/* Count statistics. */
-			rmean_collect(rmean_net, IPROTO_SENT, nwr);
+			rmean_collect(con->iproto_thread->rmean,
+				      IPROTO_SENT, nwr);
 		} else if (nwr < 0 && ! sio_wouldblock(errno)) {
 			diag_log();
 		}
@@ -2011,24 +1989,23 @@ net_send_greeting(struct cmsg *m)
 	iproto_msg_delete(msg);
 }
 
-static const struct cmsg_hop connect_route[] = {
-	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
-};
-
 /** }}} */
 
 /**
  * Create a connection and start input.
  */
 static int
-iproto_on_accept(struct evio_service * /* service */, int fd,
+iproto_on_accept(struct evio_service *service, int fd,
 		 struct sockaddr *addr, socklen_t addrlen)
 {
 	(void) addr;
 	(void) addrlen;
 	struct iproto_msg *msg;
-	struct iproto_connection *con = iproto_connection_new(fd);
+
+	struct iproto_thread *iproto_thread =
+		(struct iproto_thread*)service->on_accept_param;
+	struct iproto_connection *con =
+		iproto_connection_new(iproto_thread, fd);
 	if (con == NULL)
 		return -1;
 	/*
@@ -2038,50 +2015,57 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	 */
 	msg = iproto_msg_new(con);
 	if (msg == NULL) {
-		mempool_free(&iproto_connection_pool, con);
+		mempool_free(&con->iproto_thread->iproto_connection_pool, con);
 		return -1;
 	}
-	cmsg_init(&msg->base, connect_route);
+	cmsg_init(&msg->base, iproto_thread->connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
-	cpipe_push(&tx_pipe, &msg->base);
+	cpipe_push(&iproto_thread->tx_pipe, &msg->base);
 	return 0;
 }
 
-static struct evio_service binary; /* iproto binary listener */
-
 /**
  * The network io thread main function:
  * begin serving the message bus.
  */
 static int
-net_cord_f(va_list /* ap */)
+net_cord_f(va_list  ap)
 {
-	mempool_create(&iproto_msg_pool, &cord()->slabc,
+	struct iproto_thread *iproto_thread =
+		va_arg(ap, struct iproto_thread *);
+
+	mempool_create(&iproto_thread->iproto_msg_pool, &cord()->slabc,
 		       sizeof(struct iproto_msg));
-	mempool_create(&iproto_connection_pool, &cord()->slabc,
+	mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
 		       sizeof(struct iproto_connection));
 
-	evio_service_init(loop(), &binary, "binary",
-			  iproto_on_accept, NULL);
+	evio_service_init(loop(), &iproto_thread->binary, "binary",
+			  iproto_on_accept, iproto_thread);
+
+	char endpoint_name[ENDPOINT_NAME_MAX];
+	snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u",
+		 iproto_thread->id);
 
 	struct cbus_endpoint endpoint;
 	/* Create "net" endpoint. */
-	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
+	cbus_endpoint_create(&endpoint, endpoint_name,
+			     fiber_schedule_cb, fiber());
 	/* Create a pipe to "tx" thread. */
-	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
+	cpipe_create(&iproto_thread->tx_pipe, "tx");
+	cpipe_set_max_input(&iproto_thread->tx_pipe, iproto_msg_max / 2);
+
 	/* Process incomming messages. */
 	cbus_loop(&endpoint);
 
-	cpipe_destroy(&tx_pipe);
+	cpipe_destroy(&iproto_thread->tx_pipe);
 	/*
 	 * Nothing to do in the fiber so far, the service
 	 * will take care of creating events for incoming
 	 * connections.
 	 */
-	if (evio_service_is_active(&binary))
-		evio_service_stop(&binary);
+	if (evio_service_is_active(&iproto_thread->binary))
+		evio_service_stop(&iproto_thread->binary);
 
 	return 0;
 }
@@ -2124,11 +2108,12 @@ static void
 tx_begin_push(struct iproto_connection *con)
 {
 	assert(! con->tx.is_push_sent);
-	cmsg_init(&con->kharon.base, push_route);
+	cmsg_init(&con->kharon.base, con->iproto_thread->push_route);
 	iproto_wpos_create(&con->kharon.wpos, con->tx.p_obuf);
 	con->tx.is_push_pending = false;
 	con->tx.is_push_sent = true;
-	cpipe_push(&net_pipe, (struct cmsg *) &con->kharon);
+	cpipe_push(&con->iproto_thread->net_pipe,
+		   (struct cmsg *) &con->kharon);
 }
 
 static void
@@ -2176,41 +2161,166 @@ iproto_session_push(struct session *session, struct port *port)
 
 /** }}} */
 
-/** Initialize the iproto subsystem and start network io thread */
-void
-iproto_init(void)
-{
-	slab_cache_create(&net_slabc, &runtime);
+static inline void
+iproto_thread_init_routes(struct iproto_thread *iproto_thread)
+{
+	iproto_thread->destroy_route[0] =
+		{ tx_process_destroy, &iproto_thread->net_pipe };
+	iproto_thread->destroy_route[1] =
+		{ net_finish_destroy, NULL };
+	iproto_thread->disconnect_route[0] =
+		{ tx_process_disconnect, &iproto_thread->net_pipe };
+	iproto_thread->disconnect_route[1] =
+		{ net_finish_disconnect, NULL };
+	iproto_thread->misc_route[0] =
+		{ tx_process_misc, &iproto_thread->net_pipe };
+	iproto_thread->misc_route[1] = { net_send_msg, NULL };
+	iproto_thread->call_route[0] =
+		{ tx_process_call, &iproto_thread->net_pipe };
+	iproto_thread->call_route[1] = { net_send_msg, NULL };
+	iproto_thread->select_route[0] =
+		{ tx_process_select, &iproto_thread->net_pipe };
+	iproto_thread->select_route[1] = { net_send_msg, NULL };
+	iproto_thread->process1_route[0] =
+		{ tx_process1, &iproto_thread->net_pipe };
+	iproto_thread->process1_route[1] = { net_send_msg, NULL };
+	iproto_thread->sql_route[0] =
+		{ tx_process_sql, &iproto_thread->net_pipe };
+	iproto_thread->sql_route[1] = { net_send_msg, NULL };
+	iproto_thread->join_route[0] =
+		{ tx_process_replication, &iproto_thread->net_pipe };
+	iproto_thread->join_route[1] = { net_end_join, NULL };
+	iproto_thread->subscribe_route[0] =
+		{ tx_process_replication, &iproto_thread->net_pipe };
+	iproto_thread->subscribe_route[1] = { net_end_subscribe, NULL };
+	iproto_thread->error_route[0] =
+		{ tx_reply_iproto_error, &iproto_thread->net_pipe };
+	iproto_thread->error_route[1] = { net_send_error, NULL };
+	iproto_thread->push_route[0] =
+		{ iproto_process_push, &iproto_thread->tx_pipe };
+	iproto_thread->push_route[1] = { tx_end_push, NULL };
+	/* IPROTO_OK */
+	iproto_thread->dml_route[0] = NULL;
+	/* IPROTO_SELECT */
+	iproto_thread->dml_route[1] = iproto_thread->select_route;
+	/* IPROTO_INSERT */
+	iproto_thread->dml_route[2] = iproto_thread->process1_route;
+	/* IPROTO_REPLACE */
+	iproto_thread->dml_route[3] = iproto_thread->process1_route;
+	/* IPROTO_UPDATE */
+	iproto_thread->dml_route[4] = iproto_thread->process1_route;
+	/* IPROTO_DELETE */
+	iproto_thread->dml_route[5] = iproto_thread->process1_route;
+	 /* IPROTO_CALL_16 */
+	iproto_thread->dml_route[6] =  iproto_thread->call_route;
+	/* IPROTO_AUTH */
+	iproto_thread->dml_route[7] = iproto_thread->misc_route;
+	/* IPROTO_EVAL */
+	iproto_thread->dml_route[8] = iproto_thread->call_route;
+	/* IPROTO_UPSERT */
+	iproto_thread->dml_route[9] = iproto_thread->process1_route;
+	/* IPROTO_CALL */
+	iproto_thread->dml_route[10] = iproto_thread->call_route;
+	/* IPROTO_EXECUTE */
+	iproto_thread->dml_route[11] = iproto_thread->sql_route;
+	/* IPROTO_NOP */
+	iproto_thread->dml_route[12] = NULL;
+	/* IPROTO_PREPARE */
+	iproto_thread->dml_route[13] = iproto_thread->sql_route;
+	iproto_thread->connect_route[0] =
+		{ tx_process_connect, &iproto_thread->net_pipe };
+	iproto_thread->connect_route[1] = { net_send_greeting, NULL };
+};
 
+static inline int
+iproto_thread_init(struct iproto_thread *iproto_thread)
+{
+	iproto_thread_init_routes(iproto_thread);
+	slab_cache_create(&iproto_thread->net_slabc, &runtime);
 	/* Init statistics counter */
-	rmean_net = rmean_new(rmean_net_strings, IPROTO_LAST);
-	if (rmean_net == NULL) {
-		slab_cache_destroy(&net_slabc);
-		tnt_raise(OutOfMemory, sizeof(struct rmean),
-			  "rmean", "struct rmean");
+	iproto_thread->rmean = rmean_new(rmean_net_strings, IPROTO_LAST);
+	if (iproto_thread->rmean == NULL) {
+		slab_cache_destroy(&iproto_thread->net_slabc);
+		diag_set(OutOfMemory, sizeof(struct rmean),
+			  "rmean_new", "struct rmean");
+		return -1;
 	}
 
-	if (cord_costart(&net_cord, "iproto", net_cord_f, NULL)) {
-		rmean_delete(rmean_net);
-		slab_cache_destroy(&net_slabc);
-		panic("failed to initialize iproto thread");
-	}
+	return 0;
+}
 
-	/* Create a pipe to "net" thread. */
-	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
+/** Initialize the iproto subsystem and start network io thread */
+void
+iproto_init(int threads_count)
+{
+	iproto_threads_count = 0;
 	struct session_vtab iproto_session_vtab = {
 		/* .push = */ iproto_session_push,
 		/* .fd = */ iproto_session_fd,
 		/* .sync = */ iproto_session_sync,
 	};
+
+
+	iproto_threads = (struct iproto_thread *)
+		calloc(threads_count, sizeof(struct iproto_thread));
+	if (iproto_threads == NULL) {
+		tnt_raise(OutOfMemory, threads_count *
+			  sizeof(struct iproto_thread), "calloc",
+			  "struct iproto_thread");
+	}
+
+	for (int i = 0; i < threads_count; i++, iproto_threads_count++) {
+		struct iproto_thread *iproto_thread = &iproto_threads[i];
+		iproto_thread->id = i;
+		if (iproto_thread_init(iproto_thread) != 0)
+			goto fail;
+
+		if (cord_costart(&iproto_thread->net_cord, "iproto",
+				 net_cord_f, iproto_thread)) {
+			rmean_delete(iproto_thread->rmean);
+			slab_cache_destroy(&iproto_thread->net_slabc);
+			goto fail;
+		}
+		/* Create a pipe to "net" thread. */
+		iproto_thread->stopped_connections =
+			RLIST_HEAD_INITIALIZER(iproto_thread->
+					       stopped_connections);
+		char endpoint_name[ENDPOINT_NAME_MAX];
+		snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u",
+			 iproto_thread->id);
+		cpipe_create(&iproto_thread->net_pipe, endpoint_name);
+		cpipe_set_max_input(&iproto_thread->net_pipe,
+				    iproto_msg_max / 2);
+	}
+
 	session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
+	return;
+
+fail:
+	iproto_free();
+	diag_raise();
 }
 
 /** Available iproto configuration changes. */
 enum iproto_cfg_op {
+	/** Command code to set max input for iproto thread */
 	IPROTO_CFG_MSG_MAX,
-	IPROTO_CFG_LISTEN
+	/**
+	 * Command code to start listen socket contained
+	 * in evio_service object
+	 */
+	IPROTO_CFG_LISTEN,
+	/**
+	 * Command code to stop listen socket contained
+	 * in evio_service object. In case when user sets
+	 * new parameters for iproto, it is necessary to stop
+	 * listen sockets in iproto threads before reconfiguration.
+	 */
+	IPROTO_CFG_STOP,
+	/**
+	 * Command code do get statistic from iproto thread
+	 */
+	IPROTO_CFG_STAT,
 };
 
 /**
@@ -2224,20 +2334,32 @@ struct iproto_cfg_msg: public cbus_call_msg
 	/** Operation to execute in iproto thread. */
 	enum iproto_cfg_op op;
 	union {
+		/** statistic stucture */
 		struct {
-			/** New URI to bind to. */
-			const char *uri;
-			/** Result address. */
-			struct sockaddr_storage addr;
-			/** Address length. */
-			socklen_t addrlen;
+			size_t mem_used;
+			size_t connections;
+			size_t requests;
 		};
+		/** Pointer to evio_service, used for bind */
+		struct evio_service *binary;
 
 		/** New iproto max message count. */
 		int iproto_msg_max;
 	};
+	struct iproto_thread *iproto_thread;
 };
 
+static inline void
+iproto_thread_fill_binary(struct iproto_thread *iproto_thread,
+			  struct evio_service *binary)
+{
+	strcpy(iproto_thread->binary.host, binary->host);
+	strcpy(iproto_thread->binary.serv, binary->serv);
+	iproto_thread->binary.addrstorage = binary->addrstorage;
+	iproto_thread->binary.addr_len = binary->addr_len;
+	ev_io_set(&iproto_thread->binary.ev, binary->ev.fd, EV_READ);
+}
+
 static inline void
 iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
 {
@@ -2245,30 +2367,52 @@ iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
 	msg->op = op;
 }
 
+static void
+iproto_fill_stat(struct iproto_thread *iproto_thread,
+		 struct iproto_cfg_msg *cfg_msg)
+{
+	cfg_msg->mem_used =
+		slab_cache_used(&iproto_thread->net_cord.slabc) +
+		slab_cache_used(&iproto_thread->net_slabc);
+	cfg_msg->connections =
+		mempool_count(&iproto_thread->iproto_connection_pool);
+	cfg_msg->requests =
+		mempool_count(&iproto_thread->iproto_msg_pool);
+}
+
 static int
 iproto_do_cfg_f(struct cbus_call_msg *m)
 {
 	struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
 	int old;
+	struct iproto_thread *iproto_thread = cfg_msg->iproto_thread;
+
 	try {
 		switch (cfg_msg->op) {
 		case IPROTO_CFG_MSG_MAX:
-			cpipe_set_max_input(&tx_pipe,
+			cpipe_set_max_input(&iproto_thread->tx_pipe,
 					    cfg_msg->iproto_msg_max / 2);
 			old = iproto_msg_max;
 			iproto_msg_max = cfg_msg->iproto_msg_max;
 			if (old < iproto_msg_max)
-				iproto_resume();
+				iproto_resume(iproto_thread);
 			break;
 		case IPROTO_CFG_LISTEN:
-			if (evio_service_is_active(&binary))
-				evio_service_stop(&binary);
-			if (cfg_msg->uri != NULL &&
-			    (evio_service_bind(&binary, cfg_msg->uri) != 0 ||
-			     evio_service_listen(&binary) != 0))
+			if (evio_service_is_active(&iproto_thread->binary)) {
+				diag_set(ClientError, ER_UNSUPPORTED, "Iproto",
+					 "listen if service already active");
 				diag_raise();
-			cfg_msg->addrlen = binary.addr_len;
-			cfg_msg->addr = binary.addrstorage;
+			}
+			iproto_thread_fill_binary(iproto_thread, cfg_msg->binary);
+			if (evio_service_listen(&iproto_thread->binary) != 0)
+				diag_raise();
+			break;
+		case IPROTO_CFG_STOP:
+			if (evio_service_is_active(&iproto_thread->binary))
+				evio_service_stop(&iproto_thread->binary);
+			break;
+		case IPROTO_CFG_STAT:
+			iproto_fill_stat(iproto_thread, cfg_msg);
 			break;
 		default:
 			unreachable();
@@ -2276,50 +2420,129 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
 	} catch (Exception *e) {
 		return -1;
 	}
+
 	return 0;
 }
 
 static inline void
-iproto_do_cfg(struct iproto_cfg_msg *msg)
+iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg)
 {
-	if (cbus_call(&net_pipe, &tx_pipe, msg, iproto_do_cfg_f,
-		      NULL, TIMEOUT_INFINITY) != 0)
+	msg->iproto_thread = iproto_thread;
+	if (cbus_call(&iproto_thread->net_pipe, &iproto_thread->tx_pipe, msg,
+		      iproto_do_cfg_f, NULL, TIMEOUT_INFINITY) != 0)
 		diag_raise();
 }
 
-void
-iproto_listen(const char *uri)
+static inline void
+iproto_send_stop_msg(void)
+{
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STOP);
+	for (int i = 0; i < iproto_threads_count; i++)
+		iproto_do_cfg(&iproto_threads[i], &cfg_msg);
+}
+
+static inline void
+iproto_send_listen_msg(struct evio_service *binary)
 {
 	struct iproto_cfg_msg cfg_msg;
 	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
-	cfg_msg.uri = uri;
-	iproto_do_cfg(&cfg_msg);
-	iproto_bound_address_storage = cfg_msg.addr;
-	iproto_bound_address_len = cfg_msg.addrlen;
+	cfg_msg.binary = binary;
+	for (int i = 0; i < iproto_threads_count; i++)
+		iproto_do_cfg(&iproto_threads[i], &cfg_msg);
+}
+
+void
+iproto_listen(const char *uri)
+{
+	struct evio_service binary;
+	memset(&binary, 0, sizeof(binary));
+
+	iproto_send_stop_msg();
+	if (uri != NULL) {
+		/*
+		 * Please note, we bind socket in main thread, and then
+		 * listen this socket in all iproto threads! With this
+		 * implementation, we rely on the Linux kernel to distribute
+		 * incoming connections across iproto threads.
+		 */
+		if (evio_service_bind(&binary, uri) != 0)
+			diag_raise();
+		iproto_send_listen_msg(&binary);
+	}
+
+	iproto_bound_address_storage = binary.addrstorage;
+	iproto_bound_address_len = binary.addr_len;
 }
 
 size_t
 iproto_mem_used(void)
 {
-	return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
+	struct iproto_cfg_msg cfg_msg;
+	size_t mem = 0;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT);
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT);
+	if (inj->iparam >= 0 && inj->iparam < iproto_threads_count) {
+		iproto_do_cfg(&iproto_threads[inj->iparam], &cfg_msg);
+		return cfg_msg.mem_used;
+	}
+#endif
+	for (int i = 0; i < iproto_threads_count; i++) {
+		iproto_do_cfg(&iproto_threads[i], &cfg_msg);
+		mem += cfg_msg.mem_used;
+	}
+	return mem;
 }
 
 size_t
 iproto_connection_count(void)
 {
-	return mempool_count(&iproto_connection_pool);
+	struct iproto_cfg_msg cfg_msg;
+	size_t count = 0;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT);
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT);
+	if (inj->iparam >= 0 && inj->iparam < iproto_threads_count) {
+		iproto_do_cfg(&iproto_threads[inj->iparam], &cfg_msg);
+		return cfg_msg.connections;
+	}
+#endif
+	for (int i = 0; i < iproto_threads_count; i++) {
+		iproto_do_cfg(&iproto_threads[i], &cfg_msg);
+		count += cfg_msg.connections;
+	}
+	return count;
 }
 
 size_t
 iproto_request_count(void)
 {
-	return mempool_count(&iproto_msg_pool);
+	struct iproto_cfg_msg cfg_msg;
+	size_t count = 0;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT);
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT);
+	if (inj->iparam >= 0 && inj->iparam < iproto_threads_count) {
+		iproto_do_cfg(&iproto_threads[inj->iparam], &cfg_msg);
+		return cfg_msg.requests;
+	}
+#endif
+	for (int i = 0; i < iproto_threads_count; i++) {
+		iproto_do_cfg(&iproto_threads[i], &cfg_msg);
+		count += cfg_msg.requests;
+	}
+	return count;
 }
 
 void
 iproto_reset_stat(void)
 {
-	rmean_cleanup(rmean_net);
+	for (int i = 0; i < iproto_threads_count; i++)
+		rmean_cleanup(iproto_threads[i].rmean);
 }
 
 void
@@ -2333,23 +2556,51 @@ iproto_set_msg_max(int new_iproto_msg_max)
 	struct iproto_cfg_msg cfg_msg;
 	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
 	cfg_msg.iproto_msg_max = new_iproto_msg_max;
-	iproto_do_cfg(&cfg_msg);
-	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+	for (int i = 0; i < iproto_threads_count; i++) {
+		iproto_do_cfg(&iproto_threads[i], &cfg_msg);
+		cpipe_set_max_input(&iproto_threads[i].net_pipe,
+				    new_iproto_msg_max / 2);
+	}
 }
 
 void
 iproto_free(void)
 {
-	tt_pthread_cancel(net_cord.id);
-	tt_pthread_join(net_cord.id, NULL);
-	/*
-	* Close socket descriptor to prevent hot standby instance
-	* failing to bind in case it tries to bind before socket
-	* is closed by OS.
-	*/
-	if (evio_service_is_active(&binary))
-		close(binary.ev.fd);
-
-	rmean_delete(rmean_net);
-	slab_cache_destroy(&net_slabc);
+	for (int i = 0; i < iproto_threads_count; i++) {
+		tt_pthread_cancel(iproto_threads[i].net_cord.id);
+		tt_pthread_join(iproto_threads[i].net_cord.id, NULL);
+		/*
+		 * Close socket descriptor to prevent hot standby instance
+		 * failing to bind in case it tries to bind before socket
+		 * is closed by OS.
+		 */
+		if (evio_service_is_active(&iproto_threads[i].binary))
+			close(iproto_threads[i].binary.ev.fd);
+
+		rmean_delete(iproto_threads[i].rmean);
+		slab_cache_destroy(&iproto_threads[i].net_slabc);
+	}
+	free(iproto_threads);
+}
+
+int
+iproto_rmean_foreach(void *cb, void *cb_ctx)
+{
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT);
+	for (size_t i = 0; i < IPROTO_LAST; i++) {
+		int64_t mean = 0;
+		int64_t total = 0;
+		for (int j = 0; j < iproto_threads_count; j++)  {
+			if (inj != NULL && inj->iparam >= 0 && inj->iparam != j)
+				continue;
+			mean += rmean_mean(iproto_threads[j].rmean, i);
+			total += rmean_total(iproto_threads[j].rmean, i);
+		}
+		int rc = ((rmean_cb)cb)(rmean_net_strings[i], mean,
+					total, cb_ctx);
+		if (rc != 0)
+			return rc;
+	}
+	return 0;
 }
diff --git a/src/box/iproto.h b/src/box/iproto.h
index f6f7101a19..d360f65e4a 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -87,11 +87,14 @@ iproto_reset_stat(void);
 const char *
 iproto_bound_address(char *buf);
 
+int
+iproto_rmean_foreach(void *cb, void *cb_ctx);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
 void
-iproto_init(void);
+iproto_init(int threads_count);
 
 void
 iproto_listen(const char *uri);
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 885a0cac1b..e10e33daef 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -44,6 +44,7 @@ local default_cfg = {
     memtx_max_tuple_size = 1024 * 1024,
     slab_alloc_granularity = 8,
     slab_alloc_factor   = 1.05,
+    iproto_threads      = 1,
     work_dir            = nil,
     memtx_dir           = ".",
     wal_dir             = ".",
@@ -128,6 +129,7 @@ local template_cfg = {
     memtx_max_tuple_size  = 'number',
     slab_alloc_granularity = 'number',
     slab_alloc_factor   = 'number',
+    iproto_threads      = 'number',
     work_dir            = 'string',
     memtx_dir            = 'string',
     wal_dir             = 'string',
diff --git a/src/box/lua/stat.c b/src/box/lua/stat.c
index 29ec38b26b..2ad6bd478c 100644
--- a/src/box/lua/stat.c
+++ b/src/box/lua/stat.c
@@ -49,8 +49,6 @@
 
 extern struct rmean *rmean_box;
 extern struct rmean *rmean_error;
-/** network statistics (iproto & cbus) */
-extern struct rmean *rmean_net;
 extern struct rmean *rmean_tx_wal_bus;
 
 static void
@@ -148,7 +146,7 @@ static int
 lbox_stat_net_index(struct lua_State *L)
 {
 	const char *key = luaL_checkstring(L, -1);
-	if (rmean_foreach(rmean_net, seek_stat_item, L) == 0)
+	if (iproto_rmean_foreach(seek_stat_item, L) == 0)
 		return 0;
 
 	if (strcmp(key, "CONNECTIONS") == 0) {
@@ -183,7 +181,7 @@ static int
 lbox_stat_net_call(struct lua_State *L)
 {
 	lua_newtable(L);
-	rmean_foreach(rmean_net, set_stat_item, L);
+	iproto_rmean_foreach(set_stat_item, L);
 
 	lua_pushstring(L, "CONNECTIONS");
 	lua_rawget(L, -2);
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 756899eff4..45ad22c4c5 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -151,6 +151,7 @@ struct errinj {
 	_(ERRINJ_APPLIER_SLOW_ACK, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_STDIN_ISATTY, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_SNAP_COMMIT_FAIL, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT, {.iparam = -1}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 1fdd9a2272..3e5caf448c 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -16,6 +16,7 @@ feedback_host:https://feedback.tarantool.io
 feedback_interval:3600
 force_recovery:false
 hot_standby:false
+iproto_threads:1
 listen:port
 log:tarantool.log
 log_format:plain
diff --git a/test/box/admin.result b/test/box/admin.result
index f8e8808e32..6d307eeccf 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -53,6 +53,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_threads
+    - 1
   - - listen
     - <hidden>
   - - log
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 693c1b5216..6965933b59 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -41,6 +41,8 @@ cfg_filter(box.cfg)
  |     - false
  |   - - hot_standby
  |     - false
+ |   - - iproto_threads
+ |     - 1
  |   - - listen
  |     - <hidden>
  |   - - log
@@ -162,6 +164,8 @@ cfg_filter(box.cfg)
  |     - false
  |   - - hot_standby
  |     - false
+ |   - - iproto_threads
+ |     - 1
  |   - - listen
  |     - <hidden>
  |   - - log
diff --git a/test/box/errinj.result b/test/box/errinj.result
index d1cbacd157..44ecafc40f 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -57,6 +57,7 @@ evals
   - ERRINJ_HTTP_RESPONSE_ADD_WAIT: false
   - ERRINJ_INDEX_ALLOC: false
   - ERRINJ_INDEX_RESERVE: false
+  - ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1
   - ERRINJ_IPROTO_TX_DELAY: false
   - ERRINJ_LOG_ROTATE: false
   - ERRINJ_MEMTX_DELAY_GC: false
diff --git a/test/box/gh-5645-several-iproto-threads.lua b/test/box/gh-5645-several-iproto-threads.lua
new file mode 100755
index 0000000000..5617c97adf
--- /dev/null
+++ b/test/box/gh-5645-several-iproto-threads.lua
@@ -0,0 +1,17 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+box.cfg({
+    listen = os.getenv('LISTEN'),
+    iproto_threads = tonumber(arg[1]),
+    wal_mode='none'
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe')
+function errinj_set(thread_id)
+    if thread_id ~= nil then
+        box.error.injection.set("ERRINJ_IPROTO_SINGLE_THREAD_STAT", thread_id)
+    end
+end
+function ping() return "pong" end
diff --git a/test/box/gh-5645-several-iproto-threads.result b/test/box/gh-5645-several-iproto-threads.result
new file mode 100644
index 0000000000..f6bd5f9686
--- /dev/null
+++ b/test/box/gh-5645-several-iproto-threads.result
@@ -0,0 +1,162 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+net_box = require('net.box')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+test_run:cmd("create server test with script='box/gh-5645-several-iproto-threads.lua'")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function iproto_call(server_addr, fibers_count)
+    local fibers = {}
+    for i = 1, fibers_count do
+        fibers[i] = fiber.new(function()
+            local conn = net_box.new(server_addr)
+            for _ = 1, 100 do
+                pcall(conn.call, conn, 'ping')
+            end
+            conn:close()
+        end)
+        fibers[i]:set_joinable(true)
+    end
+    for _, f in ipairs(fibers) do
+        f:join()
+    end
+end;
+ | ---
+ | ...
+function get_network_stat()
+    local total_net_stat_table = test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table ~= nil)
+    local connections = 0
+    local requests = 0
+    local sent = 0
+    local received = 0
+    for name, net_stat_table in pairs(total_net_stat_table) do
+        assert(net_stat_table ~= nil)
+        if name == "CONNECTIONS" then
+            for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    connections = val
+                end
+            end
+        elseif name == "REQUESTS" then
+           for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    requests = val
+                end
+            end
+        elseif name == "SENT" then
+            for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    sent = val
+                end
+            end
+        elseif name == "RECEIVED" then
+            for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    received = val
+                end
+            end
+        else
+            assert(false)
+        end
+    end
+    return connections, requests, sent, received
+end
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | ...
+
+-- We check that statistics gathered per each thread in sum is equal to
+-- statistics gathered from all threads.
+--
+thread_count = 2
+ | ---
+ | ...
+fibers_count = 100
+ | ---
+ | ...
+test_run:cmd(string.format("start server test with args=\"%s\"", thread_count))
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+iproto_call(server_addr, fibers_count)
+ | ---
+ | ...
+-- Total statistics from all threads.
+conn_t, req_t, sent_t, rec_t = get_network_stat()
+ | ---
+ | ...
+-- Statistics per thread.
+conn, req, sent, rec = 0, 0, 0, 0
+ | ---
+ | ...
+assert(conn_t == fibers_count)
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for thread_id = 1, thread_count do
+    test_run:eval("test", string.format("errinj_set(%d)", thread_id - 1))
+    local conn_c, req_c, sent_c, rec_c = get_network_stat()
+    conn = conn + conn_c
+    req = req + req_c
+    sent = sent + sent_c
+    rec = rec + rec_c
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+assert(conn_t == conn)
+ | ---
+ | - true
+ | ...
+assert(req_t == req)
+ | ---
+ | - true
+ | ...
+assert(sent_t == sent)
+ | ---
+ | - true
+ | ...
+assert(rec_t == rec)
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("cleanup server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server test")
+ | ---
+ | - true
+ | ...
diff --git a/test/box/gh-5645-several-iproto-threads.test.lua b/test/box/gh-5645-several-iproto-threads.test.lua
new file mode 100755
index 0000000000..267efe072a
--- /dev/null
+++ b/test/box/gh-5645-several-iproto-threads.test.lua
@@ -0,0 +1,96 @@
+env = require('test_run')
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = env.new()
+test_run:cmd("create server test with script='box/gh-5645-several-iproto-threads.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function iproto_call(server_addr, fibers_count)
+    local fibers = {}
+    for i = 1, fibers_count do
+        fibers[i] = fiber.new(function()
+            local conn = net_box.new(server_addr)
+            for _ = 1, 100 do
+                pcall(conn.call, conn, 'ping')
+            end
+            conn:close()
+        end)
+        fibers[i]:set_joinable(true)
+    end
+    for _, f in ipairs(fibers) do
+        f:join()
+    end
+end;
+function get_network_stat()
+    local total_net_stat_table = test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table ~= nil)
+    local connections = 0
+    local requests = 0
+    local sent = 0
+    local received = 0
+    for name, net_stat_table in pairs(total_net_stat_table) do
+        assert(net_stat_table ~= nil)
+        if name == "CONNECTIONS" then
+            for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    connections = val
+                end
+            end
+        elseif name == "REQUESTS" then
+           for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    requests = val
+                end
+            end
+        elseif name == "SENT" then
+            for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    sent = val
+                end
+            end
+        elseif name == "RECEIVED" then
+            for name, val in pairs(net_stat_table) do
+                if name == "total" then
+                    received = val
+                end
+            end
+        else
+            assert(false)
+        end
+    end
+    return connections, requests, sent, received
+end
+test_run:cmd("setopt delimiter ''");
+
+-- We check that statistics gathered per each thread in sum is equal to
+-- statistics gathered from all threads.
+--
+thread_count = 2
+fibers_count = 100
+test_run:cmd(string.format("start server test with args=\"%s\"", thread_count))
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+iproto_call(server_addr, fibers_count)
+-- Total statistics from all threads.
+conn_t, req_t, sent_t, rec_t = get_network_stat()
+-- Statistics per thread.
+conn, req, sent, rec = 0, 0, 0, 0
+assert(conn_t == fibers_count)
+
+test_run:cmd("setopt delimiter ';'")
+for thread_id = 1, thread_count do
+    test_run:eval("test", string.format("errinj_set(%d)", thread_id - 1))
+    local conn_c, req_c, sent_c, rec_c = get_network_stat()
+    conn = conn + conn_c
+    req = req + req_c
+    sent = sent + sent_c
+    rec = rec + rec_c
+end;
+test_run:cmd("setopt delimiter ''");
+assert(conn_t == conn)
+assert(req_t == req)
+assert(sent_t == sent)
+assert(rec_t == rec)
+
+test_run:cmd("stop server test")
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index d5f72e5592..4943974489 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
GitLab