From e513d4ce51d25ae56ca356183e6999ad6c298867 Mon Sep 17 00:00:00 2001 From: Nikolay Shirokovskiy <nshirokovskiy@tarantool.org> Date: Wed, 3 Jul 2024 15:21:52 +0300 Subject: [PATCH] core: fiber pool shutdown Fiber pool shutdown is finishing all idle fibers. Any message processing is finished earlier on client fiber shutdown. We need some changes in shutdown order to make fiber pool shutdown. First we need to move stopping of iproto threads from free to shutdown step. The issue is we want to destroy "tx" endpoint which requires all pipes connected to it to be destroyed first. There are pipes in iproto threads that connected to "tx". Currently we destroy pipes in free step and at this point as there is no event loop in tx thread `cbus_endpoint_destroy` can't receive notifications that pipes are destroyed. Originally we put stopping of iproto threads to the free step because we don't have client fibers shutdown. So it was convenient to have working `net_pipe` so that client fibers can use iproto API without adding extra logic to them. Now I guess it make sense to stop client fibers before iproto shutdown. This is the second change in shutdown order. There is another reason why we have iproto shutdown before client fiber shutdown. In the process of iproto shutdown we close connections first and then cancel all requests in progress. This way client do not receive unexpected `FiberIsCancelled` errors in the process of server shutdown. After the patch it not so. Well we may close connections as an extra step before client fibers shutdown. But let's leave it this way. Good clients can subscribe to servere shutdown and prepare for it. Otherwise they may receive `FiberIsCancelled` for theier request which looks sensible. It is also makes sense now to move watcher and client fiber shutdown to `box_shutdown` as we can both use watcher and create client fibers without starting a box. While at it also drop a note in code why we shutdown watcher before even fiber clients shutdown. Part of #9722 NO_CHANGELOG=internal NO_DOC=internal --- src/box/box.cc | 32 ++++++++++++--------- src/box/iproto.cc | 14 +++++---- src/lib/core/fiber_pool.c | 23 ++++++--------- src/lib/core/fiber_pool.h | 4 +++ test/box-luatest/graceful_shutdown_test.lua | 2 +- test/box-luatest/shutdown_test.lua | 30 ++++++++++++++++++- 6 files changed, 71 insertions(+), 34 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index a2df1ab407..5b804b7df4 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -6028,7 +6028,7 @@ box_storage_free(void) flightrec_free(); audit_log_free(); sql_built_in_functions_cache_free(); - /* fiber_pool_destroy(&tx_fiber_pool); */ + fiber_pool_destroy(&tx_fiber_pool); is_storage_initialized = false; } @@ -6088,26 +6088,32 @@ box_storage_shutdown() diag_log(); panic("cannot gracefully shutdown iproto"); } - box_watcher_shutdown(); - /* - * Finish client fibers after iproto_shutdown otherwise new fibers - * can be started through new iproto requests. Also we should - * finish client fibers before other subsystems shutdown so that - * we won't need to handle requests from client fibers after/during - * subsystem shutdown. - */ - if (fiber_shutdown(box_shutdown_timeout) != 0) { - diag_log(); - panic("cannot gracefully shutdown client fibers"); - } replication_shutdown(); gc_shutdown(); engine_shutdown(); + fiber_pool_shutdown(&tx_fiber_pool); } void box_shutdown(void) { + /* + * Watcher should be shutdown before subsystems shutdown because + * it may execute client code. It can be shutdown before or + * after client fiber shutdown. Both cases are correct. But + * if we do it after we may have noisy log fiber creation failure + * for async watcher execution. + */ + box_watcher_shutdown(); + /* + * Finish client fibers before other subsystems shutdown so that + * we won't get unexpected request to shutdown subsystems from + * client code. + */ + if (fiber_shutdown(box_shutdown_timeout) != 0) { + diag_log(); + panic("cannot gracefully shutdown client fibers"); + } box_storage_shutdown(); } diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 6223bdeeda..a3245d668c 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -4304,17 +4304,21 @@ int iproto_shutdown(double timeout) { assert(iproto_is_shutting_down); - return iproto_drop_connections(timeout); + if (iproto_drop_connections(timeout) != 0) + return -1; + for (int i = 0; i < iproto_threads_count; i++) { + cbus_stop_loop(&iproto_threads[i].net_pipe); + cpipe_destroy(&iproto_threads[i].net_pipe); + if (cord_join(&iproto_threads[i].net_cord) != 0) + panic_syserror("iproto cord join failed"); + } + return 0; } void iproto_free(void) { for (int i = 0; i < iproto_threads_count; i++) { - cbus_stop_loop(&iproto_threads[i].net_pipe); - cpipe_destroy(&iproto_threads[i].net_pipe); - if (cord_join(&iproto_threads[i].net_cord) != 0) - panic_syserror("iproto cord join failed"); mh_i32_delete(iproto_threads[i].req_handlers); /* * Close socket descriptor to prevent hot standby instance diff --git a/src/lib/core/fiber_pool.c b/src/lib/core/fiber_pool.c index bd5f47284a..4bb30af361 100644 --- a/src/lib/core/fiber_pool.c +++ b/src/lib/core/fiber_pool.c @@ -188,26 +188,21 @@ fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size, } void -fiber_pool_destroy(struct fiber_pool *pool) +fiber_pool_shutdown(struct fiber_pool *pool) { - /** Endpoint has connected pipes or unfetched messages */ cbus_endpoint_destroy(&pool->endpoint, NULL); - /** - * At this point all messages are started to execution because last - * cbus poison message was fired (endpoint_destroy condition). - * We won't to have new messages from cbus and can send wakeup - * to each idle fiber. In this case idle fiber can not fetch any - * new message and will exit. We adjust idle_timeout to. - */ - pool->idle_timeout = 0; - struct fiber *idle_fiber; - rlist_foreach_entry(idle_fiber, &pool->idle, state) - fiber_wakeup(idle_fiber); + struct fiber *idle_fiber, *tmp; + rlist_foreach_entry_safe(idle_fiber, &pool->idle, state, tmp) + fiber_cancel(idle_fiber); /** * Just wait on fiber exit condition until all fibers are done */ while (pool->size > 0) fiber_cond_wait(&pool->worker_cond); - fiber_cond_destroy(&pool->worker_cond); } +void +fiber_pool_destroy(struct fiber_pool *pool) +{ + fiber_cond_destroy(&pool->worker_cond); +} diff --git a/src/lib/core/fiber_pool.h b/src/lib/core/fiber_pool.h index ede8b84d66..3c068af696 100644 --- a/src/lib/core/fiber_pool.h +++ b/src/lib/core/fiber_pool.h @@ -92,6 +92,10 @@ fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size, void fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size); +/** Shutdown fiber pool. Finish all idle fibers left. */ +void +fiber_pool_shutdown(struct fiber_pool *pool); + /** * Destroy a fiber pool */ diff --git a/test/box-luatest/graceful_shutdown_test.lua b/test/box-luatest/graceful_shutdown_test.lua index bf7bf033b0..dfc601ca49 100644 --- a/test/box-luatest/graceful_shutdown_test.lua +++ b/test/box-luatest/graceful_shutdown_test.lua @@ -122,7 +122,7 @@ g.test_hung_requests_aborted = function() g.server:stop() local res, err = fut:result() t.assert_not(res) - t.assert_equals(tostring(err), 'Peer closed') + t.assert_equals(tostring(err), 'fiber is cancelled') t.assert_equals(conn.state, 'error') t.assert_equals(conn.error, 'Peer closed') conn:close() diff --git a/test/box-luatest/shutdown_test.lua b/test/box-luatest/shutdown_test.lua index 636aeeeba0..90a7dfd6b2 100644 --- a/test/box-luatest/shutdown_test.lua +++ b/test/box-luatest/shutdown_test.lua @@ -144,7 +144,7 @@ g.test_shutdown_of_hanging_iproto_request = function(cg) end) local log = fio.pathjoin(cg.server.workdir, cg.server.alias .. '.log') test_no_hang_on_shutdown(cg.server) - t.assert(cg.server:grep_log('cannot gracefully shutdown iproto', nil, + t.assert(cg.server:grep_log('cannot gracefully shutdown client fibers', nil, {filename = log})) end @@ -188,3 +188,31 @@ g.test_shutdown_memtx_gc_free_tuples = function(cg) end) server:stop() end + +local g_idle_pool = t.group('idle pool') + +g_idle_pool.before_each(function(cg) + cg.server = server:new({ + env = { + TARANTOOL_RUN_BEFORE_BOX_CFG = [[ + local tweaks = require('internal.tweaks') + tweaks.box_fiber_pool_idle_timeout = 100 + ]] + } + }) + cg.server:start() +end) + +g_idle_pool.after_each(function(cg) + if cg.server ~= nil then + cg.server:drop() + end +end) + +-- Test shutdown with idle fiber in fiber pool. +g_idle_pool.test_shutdown_fiber_pool_with_idle_fibers = function(cg) + t.tarantool.skip_if_not_debug() + -- Just to create idle fiber in pool after request. + cg.server:exec(function() end) + test_no_hang_on_shutdown(cg.server) +end -- GitLab