From 1dcdc98eca144229dc4851043addcf7a2c8a2a74 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Thu, 3 May 2018 19:06:56 +0300 Subject: [PATCH] iproto: follow up patch for the fix for blocked connection * rename request_limit.test.lua to net_msg_max.test.lua * make net_msg_max.test.lua stable (courtesy of @gerold103) * exclude disconnect messages from iproto_msg_max limit * add a separate warning for throttling based on readahead buffer overflow --- src/box/iproto.cc | 56 +++++++++---------- ...equest_limit.result => net_msg_max.result} | 12 ++-- ...st_limit.test.lua => net_msg_max.test.lua} | 12 ++-- 3 files changed, 37 insertions(+), 43 deletions(-) rename test/box/{request_limit.result => net_msg_max.result} (91%) rename test/box/{request_limit.test.lua => net_msg_max.test.lua} (89%) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 437acce81f..0b92c316ed 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -355,7 +355,9 @@ struct iproto_connection struct session *session; ev_loop *loop; /* Pre-allocated disconnect msg. */ - struct iproto_msg *disconnect; + struct cmsg disconnect; + /** True if disconnect message is sent. Debug-only. */ + bool is_disconnected; struct rlist in_stop_list; /** * The following fields are used exclusively by the tx thread. @@ -373,15 +375,13 @@ static RLIST_HEAD(stopped_connections); /** * Return true if we have not enough spare messages - * in the message pool. Disconnect messages are - * discounted: they are mostly reserved and idle. + * in the message pool. */ static inline bool -iproto_must_stop_input() +iproto_check_msg_max() { - size_t connection_count = mempool_count(&iproto_connection_pool); size_t request_count = mempool_count(&iproto_msg_pool); - return request_count > connection_count + IPROTO_MSG_MAX; + return request_count > IPROTO_MSG_MAX; } /** @@ -399,7 +399,7 @@ iproto_resume() */ if (rlist_empty(&stopped_connections)) return; - if (iproto_must_stop_input()) + if (iproto_check_msg_max()) return; struct iproto_connection *con; @@ -434,7 +434,7 @@ iproto_connection_is_idle(struct iproto_connection *con) static inline void iproto_connection_stop(struct iproto_connection *con) { - say_warn("readahead limit reached, stopping input on connection %s", + say_warn("net_msg_max limit reached, stopping input on connection %s", sio_socketname(con->input.fd)); assert(rlist_empty(&con->in_stop_list)); ev_io_stop(con->loop, &con->input); @@ -479,10 +479,9 @@ iproto_connection_close(struct iproto_connection *con) * twice. */ if (iproto_connection_is_idle(con)) { - assert(con->disconnect != NULL); - struct iproto_msg *msg = con->disconnect; - con->disconnect = NULL; - cpipe_push(&tx_pipe, &msg->base); + assert(con->is_disconnected == false); + con->is_disconnected = true; + cpipe_push(&tx_pipe, &con->disconnect); } rlist_del(&con->in_stop_list); } @@ -670,12 +669,10 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, } /* * Throttle if there are too many pending requests, - * otherwise we might deplete the fiber pool and - * deadlock (e.g. WAL writer needs a fiber to wake - * another fiber waiting for write to complete). - * Ignore iproto_connection->disconnect messages. + * otherwise we might deplete the fiber pool in tx + * thread and deadlock. */ - if (iproto_must_stop_input()) { + if (iproto_check_msg_max()) { iproto_connection_stop(con); return; } @@ -684,6 +681,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, /* Ensure we have sufficient space for the next round. */ struct ibuf *in = iproto_connection_input_buffer(con); if (in == NULL) { + say_warn("readahead limit reached, stopping input on connection %s", + sio_socketname(con->input.fd)); ev_io_stop(loop, &con->input); return; } @@ -820,8 +819,8 @@ iproto_connection_new(int fd) con->session = NULL; rlist_create(&con->in_stop_list); /* It may be very awkward to allocate at close. */ - con->disconnect = iproto_msg_new(con); - cmsg_init(&con->disconnect->base, disconnect_route); + cmsg_init(&con->disconnect, disconnect_route); + con->is_disconnected = false; return con; } @@ -843,8 +842,6 @@ iproto_connection_delete(struct iproto_connection *con) con->obuf[0].iov[0].iov_base == NULL); assert(con->obuf[1].pos == 0 && con->obuf[1].iov[0].iov_base == NULL); - if (con->disconnect) - iproto_msg_delete(con->disconnect); mempool_free(&iproto_connection_pool, con); } @@ -1032,8 +1029,8 @@ tx_fiber_init(struct session *session, uint64_t sync) static void tx_process_disconnect(struct cmsg *m) { - struct iproto_msg *msg = (struct iproto_msg *) m; - struct iproto_connection *con = msg->connection; + struct iproto_connection *con = + container_of(m, struct iproto_connection, disconnect); if (con->session) { tx_fiber_init(con->session, 0); /* @@ -1061,10 +1058,10 @@ tx_process_disconnect(struct cmsg *m) static void net_finish_disconnect(struct cmsg *m) { - struct iproto_msg *msg = (struct iproto_msg *) m; + struct iproto_connection *con = + container_of(m, struct iproto_connection, disconnect); /* Runs the trigger, which may yield. */ - iproto_connection_delete(msg->connection); - iproto_msg_delete(msg); + iproto_connection_delete(con); } @@ -1084,11 +1081,12 @@ net_discard_input(struct cmsg *m) { struct iproto_msg *msg = container_of(m, struct iproto_msg, discard_input); - struct iproto_connection *conn = msg->connection; + struct iproto_connection *con = msg->connection; msg->p_ibuf->rpos += msg->len; msg->len = 0; - conn->long_poll_requests++; - ev_feed_event(conn->loop, &conn->input, EV_READ); + con->long_poll_requests++; + if (! ev_is_active(&con->input) && rlist_empty(&con->in_stop_list)) + ev_feed_event(con->loop, &con->input, EV_READ); } static void diff --git a/test/box/request_limit.result b/test/box/net_msg_max.result similarity index 91% rename from test/box/request_limit.result rename to test/box/net_msg_max.result index bef998b91b..dde2016b77 100644 --- a/test/box/request_limit.result +++ b/test/box/net_msg_max.result @@ -47,7 +47,7 @@ test_run:cmd("setopt delimiter ';'") function do_long_f(...) active = active + 1 while not continue do - fiber.sleep(0.1) + fiber.sleep(0.01) end active = active - 1 finished = finished + 1 @@ -70,11 +70,9 @@ end; ... -- Wait until 'active' stops growing - it means, that the input -- is blocked. -function wait_block() - local old_val = -1 - while old_val ~= active do - old_val = active - fiber.sleep(0.1) +function wait_active(value) + while value ~= active do + fiber.sleep(0.01) end end; --- @@ -98,7 +96,7 @@ run_workers(conn) run_workers(conn2) --- ... -wait_block() +wait_active(run_max * 2) --- ... active == run_max * 2 or active diff --git a/test/box/request_limit.test.lua b/test/box/net_msg_max.test.lua similarity index 89% rename from test/box/request_limit.test.lua rename to test/box/net_msg_max.test.lua index 2bc35d8fa9..560e37017a 100644 --- a/test/box/request_limit.test.lua +++ b/test/box/net_msg_max.test.lua @@ -20,7 +20,7 @@ test_run:cmd("setopt delimiter ';'") function do_long_f(...) active = active + 1 while not continue do - fiber.sleep(0.1) + fiber.sleep(0.01) end active = active - 1 finished = finished + 1 @@ -40,11 +40,9 @@ end; -- Wait until 'active' stops growing - it means, that the input -- is blocked. -function wait_block() - local old_val = -1 - while old_val ~= active do - old_val = active - fiber.sleep(0.1) +function wait_active(value) + while value ~= active do + fiber.sleep(0.01) end end; @@ -59,7 +57,7 @@ test_run:cmd("setopt delimiter ''"); -- run_workers(conn) run_workers(conn2) -wait_block() +wait_active(run_max * 2) active == run_max * 2 or active wait_finished(active) -- GitLab