diff --git a/.luacheckrc b/.luacheckrc index 703f86dc58d5e884322e8e2699be1bc394153266..80405eb2172c2cf3be9711eba28946f033324b58 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -76,6 +76,12 @@ files["test/box/box.lua"] = { "iproto_request", } } +files["test/box/gh-5645-several-iproto-threads.lua"] = { + globals = { + "errinj_set", + "ping", + }, +} files["test/box-tap/session.test.lua"] = { globals = { "session", diff --git a/changelogs/unreleased/several-iproto-threads.md b/changelogs/unreleased/several-iproto-threads.md new file mode 100644 index 0000000000000000000000000000000000000000..84aa1a285f3d41409c8d39746918a95fd49f92e2 --- /dev/null +++ b/changelogs/unreleased/several-iproto-threads.md @@ -0,0 +1,5 @@ +## feature/core + +* Implement ability to run multiple iproto threads, which is useful + in some specific workloads where iproto thread is the bottleneck + of throughput (gh-5645). \ No newline at end of file diff --git a/src/box/box.cc b/src/box/box.cc index b846ba8f5dc28dc28eaa19cd2f78b8a413e6a6c0..c60430a93c0c10e1b6695b0f794f0429b4d5fd6e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -3229,7 +3229,7 @@ box_cfg_xc(void) schema_init(); replication_init(); port_init(); - iproto_init(); + iproto_init(cfg_geti("iproto_threads")); sql_init(); int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); diff --git a/src/box/iproto.cc b/src/box/iproto.cc index aabb1f33aedf325752501c692f53c9eeeba55b2b..4bb49d3de38fe831b4ecc07afe5ceefb20664e5b 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -69,6 +69,10 @@ enum { IPROTO_PACKET_SIZE_MAX = 2UL * 1024 * 1024 * 1024, }; +enum { + ENDPOINT_NAME_MAX = 10 +}; + /** * A position in connection output buffer. * Since we use rotating buffers to recycle memory, @@ -87,6 +91,71 @@ iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out) wpos->svp = obuf_create_svp(out); } +struct iproto_thread { + /** + * Slab cache used for allocating memory for output network buffers + * in the tx thread. + */ + struct slab_cache net_slabc; + /** + * Network thread execution unit. + */ + struct cord net_cord; + /** + * A single global queue for all requests in all connections. All + * requests from all connections are processed concurrently. + * Is also used as a queue for just established connections and to + * execute disconnect triggers. A few notes about these triggers: + * - they need to be run in a fiber + * - unlike an ordinary request failure, on_connect trigger + * failure must lead to connection close. + * - on_connect trigger must be processed before any other + * request on this connection. + */ + struct cpipe tx_pipe; + struct cpipe net_pipe; + /** + * Static routes for this iproto thread + */ + struct cmsg_hop destroy_route[2]; + struct cmsg_hop disconnect_route[2]; + struct cmsg_hop misc_route[2]; + struct cmsg_hop call_route[2]; + struct cmsg_hop select_route[2]; + struct cmsg_hop process1_route[2]; + struct cmsg_hop sql_route[2]; + struct cmsg_hop join_route[2]; + struct cmsg_hop subscribe_route[2]; + struct cmsg_hop error_route[2]; + struct cmsg_hop push_route[2]; + struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX]; + struct cmsg_hop connect_route[2]; + /* + * Iproto thread memory pools + */ + struct mempool iproto_msg_pool; + struct mempool iproto_connection_pool; + /* + * List of stopped connections + */ + RLIST_HEAD(stopped_connections); + /* + * Iproto thread stat + */ + struct rmean *rmean; + /* + * Iproto thread id + */ + uint32_t id; + /* + * Iproto binary listener + */ + struct evio_service binary; +}; + +static struct iproto_thread *iproto_threads; +static int iproto_threads_count; + /** * In Greek mythology, Kharon is the ferryman who carries souls * of the newly deceased across the river Styx that divided the @@ -236,8 +305,6 @@ struct iproto_msg bool close_connection; }; -static struct mempool iproto_msg_pool; - static struct iproto_msg * iproto_msg_new(struct iproto_connection *con); @@ -245,46 +312,12 @@ iproto_msg_new(struct iproto_connection *con); * Resume stopped connections, if any. */ static void -iproto_resume(void); +iproto_resume(struct iproto_thread *iproto_thread); static void iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, bool *stop_input); -static inline void -iproto_msg_delete(struct iproto_msg *msg) -{ - mempool_free(&iproto_msg_pool, msg); - iproto_resume(); -} - -/** - * A single global queue for all requests in all connections. All - * requests from all connections are processed concurrently. - * Is also used as a queue for just established connections and to - * execute disconnect triggers. A few notes about these triggers: - * - they need to be run in a fiber - * - unlike an ordinary request failure, on_connect trigger - * failure must lead to connection close. - * - on_connect trigger must be processed before any other - * request on this connection. - */ -static struct cpipe tx_pipe; -static struct cpipe net_pipe; - -/** - * Network thread. - */ -static struct cord net_cord; - -/** - * Slab cache used for allocating memory for output network buffers - * in the tx thread. - */ -static struct slab_cache net_slabc; - -struct rmean *rmean_net; - enum rmean_net_name { IPROTO_SENT, IPROTO_RECEIVED, @@ -306,11 +339,6 @@ tx_process_destroy(struct cmsg *m); static void net_finish_destroy(struct cmsg *m); -static const struct cmsg_hop destroy_route[] = { - { tx_process_destroy, &net_pipe }, - { net_finish_destroy, NULL }, -}; - /** Fire on_disconnect triggers in the tx thread. */ static void tx_process_disconnect(struct cmsg *m); @@ -319,11 +347,6 @@ tx_process_disconnect(struct cmsg *m); static void net_finish_disconnect(struct cmsg *m); -static const struct cmsg_hop disconnect_route[] = { - { tx_process_disconnect, &net_pipe }, - { net_finish_disconnect, NULL } -}; - /** * Kharon is in the dead world (iproto). Schedule an event to * flush new obuf as reflected in the fresh wpos. @@ -341,12 +364,6 @@ iproto_process_push(struct cmsg *m); static void tx_end_push(struct cmsg *m); -static const struct cmsg_hop push_route[] = { - { iproto_process_push, &tx_pipe }, - { tx_end_push, NULL } -}; - - /* }}} */ /* {{{ iproto_connection - declaration and definition */ @@ -536,29 +553,37 @@ struct iproto_connection } tx; /** Authentication salt. */ char salt[IPROTO_SALT_SIZE]; + /** Iproto connection thread */ + struct iproto_thread *iproto_thread; }; -static struct mempool iproto_connection_pool; -static RLIST_HEAD(stopped_connections); - /** * Return true if we have not enough spare messages * in the message pool. */ static inline bool -iproto_check_msg_max(void) +iproto_check_msg_max(struct iproto_thread *iproto_thread) { - size_t request_count = mempool_count(&iproto_msg_pool); + size_t request_count = mempool_count(&iproto_thread->iproto_msg_pool); return request_count > (size_t) iproto_msg_max; } +static inline void +iproto_msg_delete(struct iproto_msg *msg) +{ + struct iproto_thread *iproto_thread = msg->connection->iproto_thread; + mempool_free(&msg->connection->iproto_thread->iproto_msg_pool, msg); + iproto_resume(iproto_thread); +} + static struct iproto_msg * iproto_msg_new(struct iproto_connection *con) { + struct mempool *iproto_msg_pool = &con->iproto_thread->iproto_msg_pool; struct iproto_msg *msg = - (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); + (struct iproto_msg *) mempool_alloc(iproto_msg_pool); ERROR_INJECT(ERRINJ_TESTING, { - mempool_free(&iproto_msg_pool, msg); + mempool_free(&con->iproto_thread->iproto_msg_pool, msg); msg = NULL; }); if (msg == NULL) { @@ -569,7 +594,7 @@ iproto_msg_new(struct iproto_connection *con) } msg->close_connection = false; msg->connection = con; - rmean_collect(rmean_net, IPROTO_REQUESTS, 1); + rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1); return msg; } @@ -624,7 +649,8 @@ iproto_connection_stop_msg_max_limit(struct iproto_connection *con) * Important to add to tail and fetch from head to ensure * strict lifo order (fairness) for stopped connections. */ - rlist_add_tail(&stopped_connections, &con->in_stop_list); + rlist_add_tail(&con->iproto_thread->stopped_connections, + &con->in_stop_list); } /** @@ -653,7 +679,7 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) * other parts of the connection. */ con->state = IPROTO_CONNECTION_DESTROYED; - cpipe_push(&tx_pipe, &con->destroy_msg); + cpipe_push(&con->iproto_thread->tx_pipe, &con->destroy_msg); } /** @@ -680,7 +706,7 @@ iproto_connection_close(struct iproto_connection *con) * is done only once. */ con->p_ibuf->wpos -= con->parse_size; - cpipe_push(&tx_pipe, &con->disconnect_msg); + cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg); assert(con->state == IPROTO_CONNECTION_ALIVE); con->state = IPROTO_CONNECTION_CLOSED; } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { @@ -814,9 +840,9 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) bool stop_input = false; const char *errmsg; while (con->parse_size != 0 && !stop_input) { - if (iproto_check_msg_max()) { + if (iproto_check_msg_max(con->iproto_thread)) { iproto_connection_stop_msg_max_limit(con); - cpipe_flush_input(&tx_pipe); + cpipe_flush_input(&con->iproto_thread->tx_pipe); return 0; } const char *reqstart = in->wpos - con->parse_size; @@ -825,7 +851,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) if (mp_typeof(*pos) != MP_UINT) { errmsg = "packet length"; err_msgpack: - cpipe_flush_input(&tx_pipe); + cpipe_flush_input(&con->iproto_thread->tx_pipe); diag_set(ClientError, ER_INVALID_MSGPACK, errmsg); return -1; @@ -861,7 +887,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * This can't throw, but should not be * done in case of exception. */ - cpipe_push_input(&tx_pipe, &msg->base); + cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); n_requests++; /* Request is parsed */ assert(reqend > reqstart); @@ -899,7 +925,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) */ ev_feed_event(con->loop, &con->input, EV_READ); } - cpipe_flush_input(&tx_pipe); + cpipe_flush_input(&con->iproto_thread->tx_pipe); return 0; } @@ -911,7 +937,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) static void iproto_connection_resume(struct iproto_connection *con) { - assert(! iproto_check_msg_max()); + assert(! iproto_check_msg_max(con->iproto_thread)); rlist_del(&con->in_stop_list); /* * Enqueue_batch() stops the connection again, if the @@ -935,15 +961,16 @@ iproto_connection_resume(struct iproto_connection *con) * necessary to use up the limit. */ static void -iproto_resume(void) +iproto_resume(struct iproto_thread *iproto_thread) { - while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { + while (!iproto_check_msg_max(iproto_thread) && + !rlist_empty(&iproto_thread->stopped_connections)) { /* * Shift from list head to ensure strict FIFO * (fairness) for resumed connections. */ struct iproto_connection *con = - rlist_first_entry(&stopped_connections, + rlist_first_entry(&iproto_thread->stopped_connections, struct iproto_connection, in_stop_list); iproto_connection_resume(con); @@ -965,7 +992,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, * otherwise we might deplete the fiber pool in tx * thread and deadlock. */ - if (iproto_check_msg_max()) { + if (iproto_check_msg_max(con->iproto_thread)) { iproto_connection_stop_msg_max_limit(con); return; } @@ -990,7 +1017,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, return; } /* Count statistics */ - rmean_collect(rmean_net, IPROTO_RECEIVED, nrd); + rmean_collect(con->iproto_thread->rmean, + IPROTO_RECEIVED, nrd); /* Update the read position and connection state. */ in->wpos += nrd; @@ -1049,7 +1077,7 @@ iproto_flush(struct iproto_connection *con) if (nwr > 0) { /* Count statistics */ - rmean_collect(rmean_net, IPROTO_SENT, nwr); + rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr); if (begin->used + nwr == end->used) { *begin = *end; return 0; @@ -1094,22 +1122,25 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, } static struct iproto_connection * -iproto_connection_new(int fd) +iproto_connection_new(struct iproto_thread *iproto_thread, int fd) { struct iproto_connection *con = (struct iproto_connection *) - mempool_alloc(&iproto_connection_pool); + mempool_alloc(&iproto_thread->iproto_connection_pool); if (con == NULL) { diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con"); return NULL; } + con->iproto_thread = iproto_thread; con->input.data = con->output.data = con; con->loop = loop(); ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); + obuf_create(&con->obuf[0], &con->iproto_thread->net_slabc, + iproto_readahead); + obuf_create(&con->obuf[1], &con->iproto_thread->net_slabc, + iproto_readahead); con->p_ibuf = &con->ibuf[0]; con->tx.p_obuf = &con->obuf[0]; iproto_wpos_create(&con->wpos, con->tx.p_obuf); @@ -1119,12 +1150,12 @@ iproto_connection_new(int fd) con->session = NULL; rlist_create(&con->in_stop_list); /* It may be very awkward to allocate at close. */ - cmsg_init(&con->destroy_msg, destroy_route); - cmsg_init(&con->disconnect_msg, disconnect_route); + cmsg_init(&con->destroy_msg, con->iproto_thread->destroy_route); + cmsg_init(&con->disconnect_msg, con->iproto_thread->disconnect_route); con->state = IPROTO_CONNECTION_ALIVE; con->tx.is_push_pending = false; con->tx.is_push_sent = false; - rmean_collect(rmean_net, IPROTO_CONNECTIONS, 1); + rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1); return con; } @@ -1147,7 +1178,7 @@ iproto_connection_delete(struct iproto_connection *con) con->obuf[0].iov[0].iov_base == NULL); assert(con->obuf[1].pos == 0 && con->obuf[1].iov[0].iov_base == NULL); - mempool_free(&iproto_connection_pool, con); + mempool_free(&con->iproto_thread->iproto_connection_pool, con); } /* }}} iproto_connection */ @@ -1190,68 +1221,12 @@ net_end_join(struct cmsg *msg); static void net_end_subscribe(struct cmsg *msg); -static const struct cmsg_hop misc_route[] = { - { tx_process_misc, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop call_route[] = { - { tx_process_call, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop select_route[] = { - { tx_process_select, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop process1_route[] = { - { tx_process1, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop sql_route[] = { - { tx_process_sql, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = { - NULL, /* IPROTO_OK */ - select_route, /* IPROTO_SELECT */ - process1_route, /* IPROTO_INSERT */ - process1_route, /* IPROTO_REPLACE */ - process1_route, /* IPROTO_UPDATE */ - process1_route, /* IPROTO_DELETE */ - call_route, /* IPROTO_CALL_16 */ - misc_route, /* IPROTO_AUTH */ - call_route, /* IPROTO_EVAL */ - process1_route, /* IPROTO_UPSERT */ - call_route, /* IPROTO_CALL */ - sql_route, /* IPROTO_EXECUTE */ - NULL, /* IPROTO_NOP */ - sql_route, /* IPROTO_PREPARE */ -}; - -static const struct cmsg_hop join_route[] = { - { tx_process_replication, &net_pipe }, - { net_end_join, NULL }, -}; - -static const struct cmsg_hop subscribe_route[] = { - { tx_process_replication, &net_pipe }, - { net_end_subscribe, NULL }, -}; - -static const struct cmsg_hop error_route[] = { - { tx_reply_iproto_error, &net_pipe }, - { net_send_error, NULL }, -}; - static void iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, bool *stop_input) { uint8_t type; + struct iproto_thread *iproto_thread = msg->connection->iproto_thread; if (xrow_header_decode(&msg->header, pos, reqend, true)) goto error; @@ -1274,43 +1249,44 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, if (xrow_decode_dml(&msg->header, &msg->dml, dml_request_key_map(type))) goto error; - assert(type < sizeof(dml_route)/sizeof(*dml_route)); - cmsg_init(&msg->base, dml_route[type]); + assert(type < sizeof(iproto_thread->dml_route) / + sizeof(*(iproto_thread->dml_route))); + cmsg_init(&msg->base, iproto_thread->dml_route[type]); break; case IPROTO_CALL_16: case IPROTO_CALL: case IPROTO_EVAL: if (xrow_decode_call(&msg->header, &msg->call)) goto error; - cmsg_init(&msg->base, call_route); + cmsg_init(&msg->base, iproto_thread->call_route); break; case IPROTO_EXECUTE: case IPROTO_PREPARE: if (xrow_decode_sql(&msg->header, &msg->sql) != 0) goto error; - cmsg_init(&msg->base, sql_route); + cmsg_init(&msg->base, iproto_thread->sql_route); break; case IPROTO_PING: - cmsg_init(&msg->base, misc_route); + cmsg_init(&msg->base, iproto_thread->misc_route); break; case IPROTO_JOIN: case IPROTO_FETCH_SNAPSHOT: case IPROTO_REGISTER: - cmsg_init(&msg->base, join_route); + cmsg_init(&msg->base, iproto_thread->join_route); *stop_input = true; break; case IPROTO_SUBSCRIBE: - cmsg_init(&msg->base, subscribe_route); + cmsg_init(&msg->base, iproto_thread->subscribe_route); *stop_input = true; break; case IPROTO_VOTE_DEPRECATED: case IPROTO_VOTE: - cmsg_init(&msg->base, misc_route); + cmsg_init(&msg->base, iproto_thread->misc_route); break; case IPROTO_AUTH: if (xrow_decode_auth(&msg->header, &msg->auth)) goto error; - cmsg_init(&msg->base, misc_route); + cmsg_init(&msg->base, iproto_thread->misc_route); break; default: diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, @@ -1323,7 +1299,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, diag_log(); diag_create(&msg->diag); diag_move(&fiber()->diag, &msg->diag); - cmsg_init(&msg->base, error_route); + cmsg_init(&msg->base, iproto_thread->error_route); } static void @@ -1436,11 +1412,12 @@ net_discard_input(struct cmsg *m) static void tx_discard_input(struct iproto_msg *msg) { + struct iproto_thread *iproto_thread = msg->connection->iproto_thread; static const struct cmsg_hop discard_input_route[] = { { net_discard_input, NULL }, }; cmsg_init(&msg->discard_input, discard_input_route); - cpipe_push(&net_pipe, &msg->discard_input); + cpipe_push(&iproto_thread->net_pipe, &msg->discard_input); } /** @@ -1990,7 +1967,8 @@ net_send_greeting(struct cmsg *m) if (nwr > 0) { /* Count statistics. */ - rmean_collect(rmean_net, IPROTO_SENT, nwr); + rmean_collect(con->iproto_thread->rmean, + IPROTO_SENT, nwr); } else if (nwr < 0 && ! sio_wouldblock(errno)) { diag_log(); } @@ -2011,24 +1989,23 @@ net_send_greeting(struct cmsg *m) iproto_msg_delete(msg); } -static const struct cmsg_hop connect_route[] = { - { tx_process_connect, &net_pipe }, - { net_send_greeting, NULL }, -}; - /** }}} */ /** * Create a connection and start input. */ static int -iproto_on_accept(struct evio_service * /* service */, int fd, +iproto_on_accept(struct evio_service *service, int fd, struct sockaddr *addr, socklen_t addrlen) { (void) addr; (void) addrlen; struct iproto_msg *msg; - struct iproto_connection *con = iproto_connection_new(fd); + + struct iproto_thread *iproto_thread = + (struct iproto_thread*)service->on_accept_param; + struct iproto_connection *con = + iproto_connection_new(iproto_thread, fd); if (con == NULL) return -1; /* @@ -2038,50 +2015,57 @@ iproto_on_accept(struct evio_service * /* service */, int fd, */ msg = iproto_msg_new(con); if (msg == NULL) { - mempool_free(&iproto_connection_pool, con); + mempool_free(&con->iproto_thread->iproto_connection_pool, con); return -1; } - cmsg_init(&msg->base, connect_route); + cmsg_init(&msg->base, iproto_thread->connect_route); msg->p_ibuf = con->p_ibuf; msg->wpos = con->wpos; - cpipe_push(&tx_pipe, &msg->base); + cpipe_push(&iproto_thread->tx_pipe, &msg->base); return 0; } -static struct evio_service binary; /* iproto binary listener */ - /** * The network io thread main function: * begin serving the message bus. */ static int -net_cord_f(va_list /* ap */) +net_cord_f(va_list ap) { - mempool_create(&iproto_msg_pool, &cord()->slabc, + struct iproto_thread *iproto_thread = + va_arg(ap, struct iproto_thread *); + + mempool_create(&iproto_thread->iproto_msg_pool, &cord()->slabc, sizeof(struct iproto_msg)); - mempool_create(&iproto_connection_pool, &cord()->slabc, + mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc, sizeof(struct iproto_connection)); - evio_service_init(loop(), &binary, "binary", - iproto_on_accept, NULL); + evio_service_init(loop(), &iproto_thread->binary, "binary", + iproto_on_accept, iproto_thread); + + char endpoint_name[ENDPOINT_NAME_MAX]; + snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u", + iproto_thread->id); struct cbus_endpoint endpoint; /* Create "net" endpoint. */ - cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber()); + cbus_endpoint_create(&endpoint, endpoint_name, + fiber_schedule_cb, fiber()); /* Create a pipe to "tx" thread. */ - cpipe_create(&tx_pipe, "tx"); - cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2); + cpipe_create(&iproto_thread->tx_pipe, "tx"); + cpipe_set_max_input(&iproto_thread->tx_pipe, iproto_msg_max / 2); + /* Process incomming messages. */ cbus_loop(&endpoint); - cpipe_destroy(&tx_pipe); + cpipe_destroy(&iproto_thread->tx_pipe); /* * Nothing to do in the fiber so far, the service * will take care of creating events for incoming * connections. */ - if (evio_service_is_active(&binary)) - evio_service_stop(&binary); + if (evio_service_is_active(&iproto_thread->binary)) + evio_service_stop(&iproto_thread->binary); return 0; } @@ -2124,11 +2108,12 @@ static void tx_begin_push(struct iproto_connection *con) { assert(! con->tx.is_push_sent); - cmsg_init(&con->kharon.base, push_route); + cmsg_init(&con->kharon.base, con->iproto_thread->push_route); iproto_wpos_create(&con->kharon.wpos, con->tx.p_obuf); con->tx.is_push_pending = false; con->tx.is_push_sent = true; - cpipe_push(&net_pipe, (struct cmsg *) &con->kharon); + cpipe_push(&con->iproto_thread->net_pipe, + (struct cmsg *) &con->kharon); } static void @@ -2176,41 +2161,166 @@ iproto_session_push(struct session *session, struct port *port) /** }}} */ -/** Initialize the iproto subsystem and start network io thread */ -void -iproto_init(void) -{ - slab_cache_create(&net_slabc, &runtime); +static inline void +iproto_thread_init_routes(struct iproto_thread *iproto_thread) +{ + iproto_thread->destroy_route[0] = + { tx_process_destroy, &iproto_thread->net_pipe }; + iproto_thread->destroy_route[1] = + { net_finish_destroy, NULL }; + iproto_thread->disconnect_route[0] = + { tx_process_disconnect, &iproto_thread->net_pipe }; + iproto_thread->disconnect_route[1] = + { net_finish_disconnect, NULL }; + iproto_thread->misc_route[0] = + { tx_process_misc, &iproto_thread->net_pipe }; + iproto_thread->misc_route[1] = { net_send_msg, NULL }; + iproto_thread->call_route[0] = + { tx_process_call, &iproto_thread->net_pipe }; + iproto_thread->call_route[1] = { net_send_msg, NULL }; + iproto_thread->select_route[0] = + { tx_process_select, &iproto_thread->net_pipe }; + iproto_thread->select_route[1] = { net_send_msg, NULL }; + iproto_thread->process1_route[0] = + { tx_process1, &iproto_thread->net_pipe }; + iproto_thread->process1_route[1] = { net_send_msg, NULL }; + iproto_thread->sql_route[0] = + { tx_process_sql, &iproto_thread->net_pipe }; + iproto_thread->sql_route[1] = { net_send_msg, NULL }; + iproto_thread->join_route[0] = + { tx_process_replication, &iproto_thread->net_pipe }; + iproto_thread->join_route[1] = { net_end_join, NULL }; + iproto_thread->subscribe_route[0] = + { tx_process_replication, &iproto_thread->net_pipe }; + iproto_thread->subscribe_route[1] = { net_end_subscribe, NULL }; + iproto_thread->error_route[0] = + { tx_reply_iproto_error, &iproto_thread->net_pipe }; + iproto_thread->error_route[1] = { net_send_error, NULL }; + iproto_thread->push_route[0] = + { iproto_process_push, &iproto_thread->tx_pipe }; + iproto_thread->push_route[1] = { tx_end_push, NULL }; + /* IPROTO_OK */ + iproto_thread->dml_route[0] = NULL; + /* IPROTO_SELECT */ + iproto_thread->dml_route[1] = iproto_thread->select_route; + /* IPROTO_INSERT */ + iproto_thread->dml_route[2] = iproto_thread->process1_route; + /* IPROTO_REPLACE */ + iproto_thread->dml_route[3] = iproto_thread->process1_route; + /* IPROTO_UPDATE */ + iproto_thread->dml_route[4] = iproto_thread->process1_route; + /* IPROTO_DELETE */ + iproto_thread->dml_route[5] = iproto_thread->process1_route; + /* IPROTO_CALL_16 */ + iproto_thread->dml_route[6] = iproto_thread->call_route; + /* IPROTO_AUTH */ + iproto_thread->dml_route[7] = iproto_thread->misc_route; + /* IPROTO_EVAL */ + iproto_thread->dml_route[8] = iproto_thread->call_route; + /* IPROTO_UPSERT */ + iproto_thread->dml_route[9] = iproto_thread->process1_route; + /* IPROTO_CALL */ + iproto_thread->dml_route[10] = iproto_thread->call_route; + /* IPROTO_EXECUTE */ + iproto_thread->dml_route[11] = iproto_thread->sql_route; + /* IPROTO_NOP */ + iproto_thread->dml_route[12] = NULL; + /* IPROTO_PREPARE */ + iproto_thread->dml_route[13] = iproto_thread->sql_route; + iproto_thread->connect_route[0] = + { tx_process_connect, &iproto_thread->net_pipe }; + iproto_thread->connect_route[1] = { net_send_greeting, NULL }; +}; +static inline int +iproto_thread_init(struct iproto_thread *iproto_thread) +{ + iproto_thread_init_routes(iproto_thread); + slab_cache_create(&iproto_thread->net_slabc, &runtime); /* Init statistics counter */ - rmean_net = rmean_new(rmean_net_strings, IPROTO_LAST); - if (rmean_net == NULL) { - slab_cache_destroy(&net_slabc); - tnt_raise(OutOfMemory, sizeof(struct rmean), - "rmean", "struct rmean"); + iproto_thread->rmean = rmean_new(rmean_net_strings, IPROTO_LAST); + if (iproto_thread->rmean == NULL) { + slab_cache_destroy(&iproto_thread->net_slabc); + diag_set(OutOfMemory, sizeof(struct rmean), + "rmean_new", "struct rmean"); + return -1; } - if (cord_costart(&net_cord, "iproto", net_cord_f, NULL)) { - rmean_delete(rmean_net); - slab_cache_destroy(&net_slabc); - panic("failed to initialize iproto thread"); - } + return 0; +} - /* Create a pipe to "net" thread. */ - cpipe_create(&net_pipe, "net"); - cpipe_set_max_input(&net_pipe, iproto_msg_max / 2); +/** Initialize the iproto subsystem and start network io thread */ +void +iproto_init(int threads_count) +{ + iproto_threads_count = 0; struct session_vtab iproto_session_vtab = { /* .push = */ iproto_session_push, /* .fd = */ iproto_session_fd, /* .sync = */ iproto_session_sync, }; + + + iproto_threads = (struct iproto_thread *) + calloc(threads_count, sizeof(struct iproto_thread)); + if (iproto_threads == NULL) { + tnt_raise(OutOfMemory, threads_count * + sizeof(struct iproto_thread), "calloc", + "struct iproto_thread"); + } + + for (int i = 0; i < threads_count; i++, iproto_threads_count++) { + struct iproto_thread *iproto_thread = &iproto_threads[i]; + iproto_thread->id = i; + if (iproto_thread_init(iproto_thread) != 0) + goto fail; + + if (cord_costart(&iproto_thread->net_cord, "iproto", + net_cord_f, iproto_thread)) { + rmean_delete(iproto_thread->rmean); + slab_cache_destroy(&iproto_thread->net_slabc); + goto fail; + } + /* Create a pipe to "net" thread. */ + iproto_thread->stopped_connections = + RLIST_HEAD_INITIALIZER(iproto_thread-> + stopped_connections); + char endpoint_name[ENDPOINT_NAME_MAX]; + snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u", + iproto_thread->id); + cpipe_create(&iproto_thread->net_pipe, endpoint_name); + cpipe_set_max_input(&iproto_thread->net_pipe, + iproto_msg_max / 2); + } + session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab; + return; + +fail: + iproto_free(); + diag_raise(); } /** Available iproto configuration changes. */ enum iproto_cfg_op { + /** Command code to set max input for iproto thread */ IPROTO_CFG_MSG_MAX, - IPROTO_CFG_LISTEN + /** + * Command code to start listen socket contained + * in evio_service object + */ + IPROTO_CFG_LISTEN, + /** + * Command code to stop listen socket contained + * in evio_service object. In case when user sets + * new parameters for iproto, it is necessary to stop + * listen sockets in iproto threads before reconfiguration. + */ + IPROTO_CFG_STOP, + /** + * Command code do get statistic from iproto thread + */ + IPROTO_CFG_STAT, }; /** @@ -2224,20 +2334,32 @@ struct iproto_cfg_msg: public cbus_call_msg /** Operation to execute in iproto thread. */ enum iproto_cfg_op op; union { + /** statistic stucture */ struct { - /** New URI to bind to. */ - const char *uri; - /** Result address. */ - struct sockaddr_storage addr; - /** Address length. */ - socklen_t addrlen; + size_t mem_used; + size_t connections; + size_t requests; }; + /** Pointer to evio_service, used for bind */ + struct evio_service *binary; /** New iproto max message count. */ int iproto_msg_max; }; + struct iproto_thread *iproto_thread; }; +static inline void +iproto_thread_fill_binary(struct iproto_thread *iproto_thread, + struct evio_service *binary) +{ + strcpy(iproto_thread->binary.host, binary->host); + strcpy(iproto_thread->binary.serv, binary->serv); + iproto_thread->binary.addrstorage = binary->addrstorage; + iproto_thread->binary.addr_len = binary->addr_len; + ev_io_set(&iproto_thread->binary.ev, binary->ev.fd, EV_READ); +} + static inline void iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op) { @@ -2245,30 +2367,52 @@ iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op) msg->op = op; } +static void +iproto_fill_stat(struct iproto_thread *iproto_thread, + struct iproto_cfg_msg *cfg_msg) +{ + cfg_msg->mem_used = + slab_cache_used(&iproto_thread->net_cord.slabc) + + slab_cache_used(&iproto_thread->net_slabc); + cfg_msg->connections = + mempool_count(&iproto_thread->iproto_connection_pool); + cfg_msg->requests = + mempool_count(&iproto_thread->iproto_msg_pool); +} + static int iproto_do_cfg_f(struct cbus_call_msg *m) { struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m; int old; + struct iproto_thread *iproto_thread = cfg_msg->iproto_thread; + try { switch (cfg_msg->op) { case IPROTO_CFG_MSG_MAX: - cpipe_set_max_input(&tx_pipe, + cpipe_set_max_input(&iproto_thread->tx_pipe, cfg_msg->iproto_msg_max / 2); old = iproto_msg_max; iproto_msg_max = cfg_msg->iproto_msg_max; if (old < iproto_msg_max) - iproto_resume(); + iproto_resume(iproto_thread); break; case IPROTO_CFG_LISTEN: - if (evio_service_is_active(&binary)) - evio_service_stop(&binary); - if (cfg_msg->uri != NULL && - (evio_service_bind(&binary, cfg_msg->uri) != 0 || - evio_service_listen(&binary) != 0)) + if (evio_service_is_active(&iproto_thread->binary)) { + diag_set(ClientError, ER_UNSUPPORTED, "Iproto", + "listen if service already active"); diag_raise(); - cfg_msg->addrlen = binary.addr_len; - cfg_msg->addr = binary.addrstorage; + } + iproto_thread_fill_binary(iproto_thread, cfg_msg->binary); + if (evio_service_listen(&iproto_thread->binary) != 0) + diag_raise(); + break; + case IPROTO_CFG_STOP: + if (evio_service_is_active(&iproto_thread->binary)) + evio_service_stop(&iproto_thread->binary); + break; + case IPROTO_CFG_STAT: + iproto_fill_stat(iproto_thread, cfg_msg); break; default: unreachable(); @@ -2276,50 +2420,129 @@ iproto_do_cfg_f(struct cbus_call_msg *m) } catch (Exception *e) { return -1; } + return 0; } static inline void -iproto_do_cfg(struct iproto_cfg_msg *msg) +iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg) { - if (cbus_call(&net_pipe, &tx_pipe, msg, iproto_do_cfg_f, - NULL, TIMEOUT_INFINITY) != 0) + msg->iproto_thread = iproto_thread; + if (cbus_call(&iproto_thread->net_pipe, &iproto_thread->tx_pipe, msg, + iproto_do_cfg_f, NULL, TIMEOUT_INFINITY) != 0) diag_raise(); } -void -iproto_listen(const char *uri) +static inline void +iproto_send_stop_msg(void) +{ + struct iproto_cfg_msg cfg_msg; + iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STOP); + for (int i = 0; i < iproto_threads_count; i++) + iproto_do_cfg(&iproto_threads[i], &cfg_msg); +} + +static inline void +iproto_send_listen_msg(struct evio_service *binary) { struct iproto_cfg_msg cfg_msg; iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN); - cfg_msg.uri = uri; - iproto_do_cfg(&cfg_msg); - iproto_bound_address_storage = cfg_msg.addr; - iproto_bound_address_len = cfg_msg.addrlen; + cfg_msg.binary = binary; + for (int i = 0; i < iproto_threads_count; i++) + iproto_do_cfg(&iproto_threads[i], &cfg_msg); +} + +void +iproto_listen(const char *uri) +{ + struct evio_service binary; + memset(&binary, 0, sizeof(binary)); + + iproto_send_stop_msg(); + if (uri != NULL) { + /* + * Please note, we bind socket in main thread, and then + * listen this socket in all iproto threads! With this + * implementation, we rely on the Linux kernel to distribute + * incoming connections across iproto threads. + */ + if (evio_service_bind(&binary, uri) != 0) + diag_raise(); + iproto_send_listen_msg(&binary); + } + + iproto_bound_address_storage = binary.addrstorage; + iproto_bound_address_len = binary.addr_len; } size_t iproto_mem_used(void) { - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc); + struct iproto_cfg_msg cfg_msg; + size_t mem = 0; + iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT); +#ifndef NDEBUG + struct errinj *inj = + errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT); + if (inj->iparam >= 0 && inj->iparam < iproto_threads_count) { + iproto_do_cfg(&iproto_threads[inj->iparam], &cfg_msg); + return cfg_msg.mem_used; + } +#endif + for (int i = 0; i < iproto_threads_count; i++) { + iproto_do_cfg(&iproto_threads[i], &cfg_msg); + mem += cfg_msg.mem_used; + } + return mem; } size_t iproto_connection_count(void) { - return mempool_count(&iproto_connection_pool); + struct iproto_cfg_msg cfg_msg; + size_t count = 0; + iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT); +#ifndef NDEBUG + struct errinj *inj = + errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT); + if (inj->iparam >= 0 && inj->iparam < iproto_threads_count) { + iproto_do_cfg(&iproto_threads[inj->iparam], &cfg_msg); + return cfg_msg.connections; + } +#endif + for (int i = 0; i < iproto_threads_count; i++) { + iproto_do_cfg(&iproto_threads[i], &cfg_msg); + count += cfg_msg.connections; + } + return count; } size_t iproto_request_count(void) { - return mempool_count(&iproto_msg_pool); + struct iproto_cfg_msg cfg_msg; + size_t count = 0; + iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT); +#ifndef NDEBUG + struct errinj *inj = + errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT); + if (inj->iparam >= 0 && inj->iparam < iproto_threads_count) { + iproto_do_cfg(&iproto_threads[inj->iparam], &cfg_msg); + return cfg_msg.requests; + } +#endif + for (int i = 0; i < iproto_threads_count; i++) { + iproto_do_cfg(&iproto_threads[i], &cfg_msg); + count += cfg_msg.requests; + } + return count; } void iproto_reset_stat(void) { - rmean_cleanup(rmean_net); + for (int i = 0; i < iproto_threads_count; i++) + rmean_cleanup(iproto_threads[i].rmean); } void @@ -2333,23 +2556,51 @@ iproto_set_msg_max(int new_iproto_msg_max) struct iproto_cfg_msg cfg_msg; iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX); cfg_msg.iproto_msg_max = new_iproto_msg_max; - iproto_do_cfg(&cfg_msg); - cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2); + for (int i = 0; i < iproto_threads_count; i++) { + iproto_do_cfg(&iproto_threads[i], &cfg_msg); + cpipe_set_max_input(&iproto_threads[i].net_pipe, + new_iproto_msg_max / 2); + } } void iproto_free(void) { - tt_pthread_cancel(net_cord.id); - tt_pthread_join(net_cord.id, NULL); - /* - * Close socket descriptor to prevent hot standby instance - * failing to bind in case it tries to bind before socket - * is closed by OS. - */ - if (evio_service_is_active(&binary)) - close(binary.ev.fd); - - rmean_delete(rmean_net); - slab_cache_destroy(&net_slabc); + for (int i = 0; i < iproto_threads_count; i++) { + tt_pthread_cancel(iproto_threads[i].net_cord.id); + tt_pthread_join(iproto_threads[i].net_cord.id, NULL); + /* + * Close socket descriptor to prevent hot standby instance + * failing to bind in case it tries to bind before socket + * is closed by OS. + */ + if (evio_service_is_active(&iproto_threads[i].binary)) + close(iproto_threads[i].binary.ev.fd); + + rmean_delete(iproto_threads[i].rmean); + slab_cache_destroy(&iproto_threads[i].net_slabc); + } + free(iproto_threads); +} + +int +iproto_rmean_foreach(void *cb, void *cb_ctx) +{ + struct errinj *inj = + errinj(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT); + for (size_t i = 0; i < IPROTO_LAST; i++) { + int64_t mean = 0; + int64_t total = 0; + for (int j = 0; j < iproto_threads_count; j++) { + if (inj != NULL && inj->iparam >= 0 && inj->iparam != j) + continue; + mean += rmean_mean(iproto_threads[j].rmean, i); + total += rmean_total(iproto_threads[j].rmean, i); + } + int rc = ((rmean_cb)cb)(rmean_net_strings[i], mean, + total, cb_ctx); + if (rc != 0) + return rc; + } + return 0; } diff --git a/src/box/iproto.h b/src/box/iproto.h index f6f7101a19d7b9ab81029d4173a59a793c8e2ee2..d360f65e4afa77af250246c0514e7ea2110cc866 100644 --- a/src/box/iproto.h +++ b/src/box/iproto.h @@ -87,11 +87,14 @@ iproto_reset_stat(void); const char * iproto_bound_address(char *buf); +int +iproto_rmean_foreach(void *cb, void *cb_ctx); + #if defined(__cplusplus) } /* extern "C" */ void -iproto_init(void); +iproto_init(int threads_count); void iproto_listen(const char *uri); diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 885a0cac1baebccdd1f0d9e588c1a0c0eb302feb..e10e33daefec348ccd2abc42cfa310e7e63cc9a1 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -44,6 +44,7 @@ local default_cfg = { memtx_max_tuple_size = 1024 * 1024, slab_alloc_granularity = 8, slab_alloc_factor = 1.05, + iproto_threads = 1, work_dir = nil, memtx_dir = ".", wal_dir = ".", @@ -128,6 +129,7 @@ local template_cfg = { memtx_max_tuple_size = 'number', slab_alloc_granularity = 'number', slab_alloc_factor = 'number', + iproto_threads = 'number', work_dir = 'string', memtx_dir = 'string', wal_dir = 'string', diff --git a/src/box/lua/stat.c b/src/box/lua/stat.c index 29ec38b26be0ed8726f68a59b421e8d612010507..2ad6bd478c0062bff115972c24c67d81a8fa63f0 100644 --- a/src/box/lua/stat.c +++ b/src/box/lua/stat.c @@ -49,8 +49,6 @@ extern struct rmean *rmean_box; extern struct rmean *rmean_error; -/** network statistics (iproto & cbus) */ -extern struct rmean *rmean_net; extern struct rmean *rmean_tx_wal_bus; static void @@ -148,7 +146,7 @@ static int lbox_stat_net_index(struct lua_State *L) { const char *key = luaL_checkstring(L, -1); - if (rmean_foreach(rmean_net, seek_stat_item, L) == 0) + if (iproto_rmean_foreach(seek_stat_item, L) == 0) return 0; if (strcmp(key, "CONNECTIONS") == 0) { @@ -183,7 +181,7 @@ static int lbox_stat_net_call(struct lua_State *L) { lua_newtable(L); - rmean_foreach(rmean_net, set_stat_item, L); + iproto_rmean_foreach(set_stat_item, L); lua_pushstring(L, "CONNECTIONS"); lua_rawget(L, -2); diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index 756899eff4c34d34197c3a6b75fc0bfe82bea90f..45ad22c4c5e67ffc506700ab70ec790bad7ba3b5 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -151,6 +151,7 @@ struct errinj { _(ERRINJ_APPLIER_SLOW_ACK, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_STDIN_ISATTY, ERRINJ_INT, {.iparam = -1}) \ _(ERRINJ_SNAP_COMMIT_FAIL, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT, {.iparam = -1}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 1fdd9a22726c70a334e5b62f7bb2742c308a5fe8..3e5caf448c898ad707f3c8354c4affac2e2bb7d9 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -16,6 +16,7 @@ feedback_host:https://feedback.tarantool.io feedback_interval:3600 force_recovery:false hot_standby:false +iproto_threads:1 listen:port log:tarantool.log log_format:plain diff --git a/test/box/admin.result b/test/box/admin.result index f8e8808e320598a191894b363b864fb4e132409e..6d307eeccf26b3a1ecaf89eefc03bbe2e74c52bc 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -53,6 +53,8 @@ cfg_filter(box.cfg) - false - - hot_standby - false + - - iproto_threads + - 1 - - listen - <hidden> - - log diff --git a/test/box/cfg.result b/test/box/cfg.result index 693c1b521648bbc4c170adc437e4b5245a29f099..6965933b59bc2654768542a7cfc7cdff105d761b 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -41,6 +41,8 @@ cfg_filter(box.cfg) | - false | - - hot_standby | - false + | - - iproto_threads + | - 1 | - - listen | - <hidden> | - - log @@ -162,6 +164,8 @@ cfg_filter(box.cfg) | - false | - - hot_standby | - false + | - - iproto_threads + | - 1 | - - listen | - <hidden> | - - log diff --git a/test/box/errinj.result b/test/box/errinj.result index d1cbacd15782b440856cd4f283766a48fac3fca9..44ecafc40fc3fba22110488bf43fb442c298e4d6 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -57,6 +57,7 @@ evals - ERRINJ_HTTP_RESPONSE_ADD_WAIT: false - ERRINJ_INDEX_ALLOC: false - ERRINJ_INDEX_RESERVE: false + - ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1 - ERRINJ_IPROTO_TX_DELAY: false - ERRINJ_LOG_ROTATE: false - ERRINJ_MEMTX_DELAY_GC: false diff --git a/test/box/gh-5645-several-iproto-threads.lua b/test/box/gh-5645-several-iproto-threads.lua new file mode 100755 index 0000000000000000000000000000000000000000..5617c97adfaaece8d831d4f860a3525773e9e374 --- /dev/null +++ b/test/box/gh-5645-several-iproto-threads.lua @@ -0,0 +1,17 @@ +#!/usr/bin/env tarantool + +require('console').listen(os.getenv('ADMIN')) + +box.cfg({ + listen = os.getenv('LISTEN'), + iproto_threads = tonumber(arg[1]), + wal_mode='none' +}) + +box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe') +function errinj_set(thread_id) + if thread_id ~= nil then + box.error.injection.set("ERRINJ_IPROTO_SINGLE_THREAD_STAT", thread_id) + end +end +function ping() return "pong" end diff --git a/test/box/gh-5645-several-iproto-threads.result b/test/box/gh-5645-several-iproto-threads.result new file mode 100644 index 0000000000000000000000000000000000000000..f6bd5f968684579d2e3e8861dff6173bd18d1d42 --- /dev/null +++ b/test/box/gh-5645-several-iproto-threads.result @@ -0,0 +1,162 @@ +-- test-run result file version 2 +env = require('test_run') + | --- + | ... +net_box = require('net.box') + | --- + | ... +fiber = require('fiber') + | --- + | ... +test_run = env.new() + | --- + | ... +test_run:cmd("create server test with script='box/gh-5645-several-iproto-threads.lua'") + | --- + | - true + | ... + +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function iproto_call(server_addr, fibers_count) + local fibers = {} + for i = 1, fibers_count do + fibers[i] = fiber.new(function() + local conn = net_box.new(server_addr) + for _ = 1, 100 do + pcall(conn.call, conn, 'ping') + end + conn:close() + end) + fibers[i]:set_joinable(true) + end + for _, f in ipairs(fibers) do + f:join() + end +end; + | --- + | ... +function get_network_stat() + local total_net_stat_table = test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table ~= nil) + local connections = 0 + local requests = 0 + local sent = 0 + local received = 0 + for name, net_stat_table in pairs(total_net_stat_table) do + assert(net_stat_table ~= nil) + if name == "CONNECTIONS" then + for name, val in pairs(net_stat_table) do + if name == "total" then + connections = val + end + end + elseif name == "REQUESTS" then + for name, val in pairs(net_stat_table) do + if name == "total" then + requests = val + end + end + elseif name == "SENT" then + for name, val in pairs(net_stat_table) do + if name == "total" then + sent = val + end + end + elseif name == "RECEIVED" then + for name, val in pairs(net_stat_table) do + if name == "total" then + received = val + end + end + else + assert(false) + end + end + return connections, requests, sent, received +end +test_run:cmd("setopt delimiter ''"); + | --- + | ... + +-- We check that statistics gathered per each thread in sum is equal to +-- statistics gathered from all threads. +-- +thread_count = 2 + | --- + | ... +fibers_count = 100 + | --- + | ... +test_run:cmd(string.format("start server test with args=\"%s\"", thread_count)) + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +iproto_call(server_addr, fibers_count) + | --- + | ... +-- Total statistics from all threads. +conn_t, req_t, sent_t, rec_t = get_network_stat() + | --- + | ... +-- Statistics per thread. +conn, req, sent, rec = 0, 0, 0, 0 + | --- + | ... +assert(conn_t == fibers_count) + | --- + | - true + | ... + +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +for thread_id = 1, thread_count do + test_run:eval("test", string.format("errinj_set(%d)", thread_id - 1)) + local conn_c, req_c, sent_c, rec_c = get_network_stat() + conn = conn + conn_c + req = req + req_c + sent = sent + sent_c + rec = rec + rec_c +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +assert(conn_t == conn) + | --- + | - true + | ... +assert(req_t == req) + | --- + | - true + | ... +assert(sent_t == sent) + | --- + | - true + | ... +assert(rec_t == rec) + | --- + | - true + | ... + +test_run:cmd("stop server test") + | --- + | - true + | ... +test_run:cmd("cleanup server test") + | --- + | - true + | ... +test_run:cmd("delete server test") + | --- + | - true + | ... diff --git a/test/box/gh-5645-several-iproto-threads.test.lua b/test/box/gh-5645-several-iproto-threads.test.lua new file mode 100755 index 0000000000000000000000000000000000000000..267efe072a062731df6cc9b09af6ff406c53e2ce --- /dev/null +++ b/test/box/gh-5645-several-iproto-threads.test.lua @@ -0,0 +1,96 @@ +env = require('test_run') +net_box = require('net.box') +fiber = require('fiber') +test_run = env.new() +test_run:cmd("create server test with script='box/gh-5645-several-iproto-threads.lua'") + +test_run:cmd("setopt delimiter ';'") +function iproto_call(server_addr, fibers_count) + local fibers = {} + for i = 1, fibers_count do + fibers[i] = fiber.new(function() + local conn = net_box.new(server_addr) + for _ = 1, 100 do + pcall(conn.call, conn, 'ping') + end + conn:close() + end) + fibers[i]:set_joinable(true) + end + for _, f in ipairs(fibers) do + f:join() + end +end; +function get_network_stat() + local total_net_stat_table = test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table ~= nil) + local connections = 0 + local requests = 0 + local sent = 0 + local received = 0 + for name, net_stat_table in pairs(total_net_stat_table) do + assert(net_stat_table ~= nil) + if name == "CONNECTIONS" then + for name, val in pairs(net_stat_table) do + if name == "total" then + connections = val + end + end + elseif name == "REQUESTS" then + for name, val in pairs(net_stat_table) do + if name == "total" then + requests = val + end + end + elseif name == "SENT" then + for name, val in pairs(net_stat_table) do + if name == "total" then + sent = val + end + end + elseif name == "RECEIVED" then + for name, val in pairs(net_stat_table) do + if name == "total" then + received = val + end + end + else + assert(false) + end + end + return connections, requests, sent, received +end +test_run:cmd("setopt delimiter ''"); + +-- We check that statistics gathered per each thread in sum is equal to +-- statistics gathered from all threads. +-- +thread_count = 2 +fibers_count = 100 +test_run:cmd(string.format("start server test with args=\"%s\"", thread_count)) +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +iproto_call(server_addr, fibers_count) +-- Total statistics from all threads. +conn_t, req_t, sent_t, rec_t = get_network_stat() +-- Statistics per thread. +conn, req, sent, rec = 0, 0, 0, 0 +assert(conn_t == fibers_count) + +test_run:cmd("setopt delimiter ';'") +for thread_id = 1, thread_count do + test_run:eval("test", string.format("errinj_set(%d)", thread_id - 1)) + local conn_c, req_c, sent_c, rec_c = get_network_stat() + conn = conn + conn_c + req = req + req_c + sent = sent + sent_c + rec = rec + rec_c +end; +test_run:cmd("setopt delimiter ''"); +assert(conn_t == conn) +assert(req_t == req) +assert(sent_t == sent) +assert(rec_t == rec) + +test_run:cmd("stop server test") +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") diff --git a/test/box/suite.ini b/test/box/suite.ini index d5f72e55927bf64de0dc13ff37427b500c00197e..4943974489d8130674e9b63ca34aef8c95596e90 100644 --- a/test/box/suite.ini +++ b/test/box/suite.ini @@ -5,7 +5,7 @@ script = box.lua disabled = rtree_errinj.test.lua tuple_bench.test.lua long_run = huge_field_map_long.test.lua config = engine.cfg -release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua +release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua use_unix_sockets = True use_unix_sockets_iproto = True