diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 81938ce0bff83a61c5f93def361215828540bf3a..437acce81feb6a97cae29e5d4c76fc8b4985b8cd 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1084,10 +1084,11 @@ net_discard_input(struct cmsg *m) { struct iproto_msg *msg = container_of(m, struct iproto_msg, discard_input); + struct iproto_connection *conn = msg->connection; msg->p_ibuf->rpos += msg->len; msg->len = 0; - msg->connection->long_poll_requests++; - iproto_resume(); + conn->long_poll_requests++; + ev_feed_event(conn->loop, &conn->input, EV_READ); } static void diff --git a/test/box/request_limit.result b/test/box/request_limit.result new file mode 100644 index 0000000000000000000000000000000000000000..bef998b91bd6f378f530a9cd638da621cd0d3813 --- /dev/null +++ b/test/box/request_limit.result @@ -0,0 +1,122 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +net_box = require('net.box') +--- +... +box.schema.user.grant('guest', 'read,write,execute', 'universe') +--- +... +conn = net_box.connect(box.cfg.listen) +--- +... +conn2 = net_box.connect(box.cfg.listen) +--- +... +active = 0 +--- +... +finished = 0 +--- +... +continue = false +--- +... +limit = 768 +--- +... +run_max = (limit - 100) / 2 +--- +... +old_readahead = box.cfg.readahead +--- +... +box.cfg{readahead = 9000} +--- +... +long_str = string.rep('a', 1000) +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function do_long_f(...) + active = active + 1 + while not continue do + fiber.sleep(0.1) + end + active = active - 1 + finished = finished + 1 +end; +--- +... +function do_long(c) + c:call('do_long_f', {long_str}) +end; +--- +... +function run_workers(c) + finished = 0 + continue = false + for i = 1, run_max do + fiber.create(do_long, c) + end +end; +--- +... +-- Wait until 'active' stops growing - it means, that the input +-- is blocked. +function wait_block() + local old_val = -1 + while old_val ~= active do + old_val = active + fiber.sleep(0.1) + end +end; +--- +... +function wait_finished(needed) + continue = true + while finished ~= needed do fiber.sleep(0.01) end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- +-- Test that message count limit is reachable. +-- +run_workers(conn) +--- +... +run_workers(conn2) +--- +... +wait_block() +--- +... +active == run_max * 2 or active +--- +- true +... +wait_finished(active) +--- +... +conn2:close() +--- +... +conn:close() +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... +box.cfg{readahead = old_readahead} +--- +... diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..2bc35d8fa9b5f505cc6e950bca5e753fa2dbcf60 --- /dev/null +++ b/test/box/request_limit.test.lua @@ -0,0 +1,70 @@ +test_run = require('test_run').new() + +fiber = require('fiber') +net_box = require('net.box') + +box.schema.user.grant('guest', 'read,write,execute', 'universe') +conn = net_box.connect(box.cfg.listen) +conn2 = net_box.connect(box.cfg.listen) +active = 0 +finished = 0 +continue = false +limit = 768 +run_max = (limit - 100) / 2 + +old_readahead = box.cfg.readahead +box.cfg{readahead = 9000} +long_str = string.rep('a', 1000) + +test_run:cmd("setopt delimiter ';'") +function do_long_f(...) + active = active + 1 + while not continue do + fiber.sleep(0.1) + end + active = active - 1 + finished = finished + 1 +end; + +function do_long(c) + c:call('do_long_f', {long_str}) +end; + +function run_workers(c) + finished = 0 + continue = false + for i = 1, run_max do + fiber.create(do_long, c) + end +end; + +-- Wait until 'active' stops growing - it means, that the input +-- is blocked. +function wait_block() + local old_val = -1 + while old_val ~= active do + old_val = active + fiber.sleep(0.1) + end +end; + +function wait_finished(needed) + continue = true + while finished ~= needed do fiber.sleep(0.01) end +end; +test_run:cmd("setopt delimiter ''"); + +-- +-- Test that message count limit is reachable. +-- +run_workers(conn) +run_workers(conn2) +wait_block() +active == run_max * 2 or active +wait_finished(active) + +conn2:close() +conn:close() + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +box.cfg{readahead = old_readahead}