diff --git a/changelogs/unreleased/gh-7473-system-fibers.md b/changelogs/unreleased/gh-7473-system-fibers.md new file mode 100644 index 0000000000000000000000000000000000000000..aad80c18a69000f07fec0846c34f58b9c9c69bfc --- /dev/null +++ b/changelogs/unreleased/gh-7473-system-fibers.md @@ -0,0 +1,4 @@ +## feature/core + +* Some of the internal fibers (e.g. connection's worker fiber, vinyl fibers + and others) cannot be cancelled from the Lua public API anymore (gh-7473). diff --git a/src/box/applier.cc b/src/box/applier.cc index 59f8f1193f53fd4421674580815c8d208a67c19b..8596023d454df6f50a446a2565f11c08858a7357 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -163,9 +163,9 @@ applier_check_sync(struct applier *applier) /** * A helper function to create an applier fiber. Basically, it's a wrapper - * around fiber_new_xc(), which appends the applier URI to the fiber name and - * makes the new fiber joinable. Note, this function creates a new fiber, but - * doesn't start it. + * around fiber_new_system_xc(), which appends the applier URI to the fiber name + * and makes the new fiber joinable. Note, this function creates a new fiber, + * but doesn't start it. */ static struct fiber * applier_fiber_new(struct applier *applier, const char *name, fiber_func func) @@ -173,7 +173,7 @@ applier_fiber_new(struct applier *applier, const char *name, fiber_func func) char buf[FIBER_NAME_MAX]; int pos = snprintf(buf, sizeof(buf), "%s/", name); uri_format(buf + pos, sizeof(buf) - pos, &applier->uri, false); - struct fiber *f = fiber_new_xc(buf, func); + struct fiber *f = fiber_new_system_xc(buf, func); fiber_set_joinable(f, true); return f; } diff --git a/src/box/gc.c b/src/box/gc.c index a9008bff45d1e5f0ba81c0e1793f06aa792f94ad..7037332c61f455ed17903ce077bd4af6a9347279 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -118,12 +118,12 @@ gc_init(void) fiber_cond_create(&gc.cleanup_cond); checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0); - gc.cleanup_fiber = fiber_new("gc", gc_cleanup_fiber_f); + gc.cleanup_fiber = fiber_new_system("gc", gc_cleanup_fiber_f); if (gc.cleanup_fiber == NULL) panic("failed to start garbage collection fiber"); - gc.checkpoint_fiber = fiber_new("checkpoint_daemon", - gc_checkpoint_fiber_f); + gc.checkpoint_fiber = fiber_new_system("checkpoint_daemon", + gc_checkpoint_fiber_f); if (gc.checkpoint_fiber == NULL) panic("failed to start checkpoint daemon fiber"); diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index aeed466cf3c0400a12dd0ab413ab9994c422fd4c..8f46152b6f3df18482ecc9138f01ab6345e11ecc 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -2709,7 +2709,7 @@ luaT_netbox_transport_start(struct lua_State *L) const char *name = tt_sprintf("%s:%s (net.box)", transport->opts.uri.host ?: "", transport->opts.uri.service ?: ""); - transport->worker = fiber_new(name, netbox_worker_f); + transport->worker = fiber_new_system(name, netbox_worker_f); if (transport->worker == NULL) { luaL_unref(L, LUA_REGISTRYINDEX, transport->coro_ref); transport->coro_ref = LUA_NOREF; diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index c95a05530bb4d5804925c05409cf96db2281d97e..8135ae0c98d1285b55f6cd4f2fcd1b7b40e36fc7 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -1310,7 +1310,7 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery, } stailq_create(&memtx->gc_queue); - memtx->gc_fiber = fiber_new("memtx.gc", memtx_engine_gc_f); + memtx->gc_fiber = fiber_new_system("memtx.gc", memtx_engine_gc_f); if (memtx->gc_fiber == NULL) goto fail; diff --git a/src/box/raft.c b/src/box/raft.c index 24c10a5b32ceeaa53f83d6edbd89d2e5f1db9940..4d6f3113aabbddc1c13fd6bb5bdc7cda7839c23c 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -155,7 +155,8 @@ box_raft_schedule_async(struct raft *raft) { assert(raft == box_raft()); if (box_raft_worker == NULL) { - box_raft_worker = fiber_new("raft_worker", box_raft_worker_f); + box_raft_worker = fiber_new_system("raft_worker", + box_raft_worker_f); if (box_raft_worker == NULL) { /* * XXX: should be handled properly, no need to panic. diff --git a/src/box/recovery.cc b/src/box/recovery.cc index a596e9d095c71df7f4624369d4a878fb04130314..d2a19b34e8cae2b5fc623a2d5230bde942c98131 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -528,7 +528,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream, * xlog. */ assert(r->watcher == NULL); - r->watcher = fiber_new_xc(name, hot_standby_f); + r->watcher = fiber_new_system(name, hot_standby_f); fiber_set_joinable(r->watcher, true); fiber_start(r->watcher, r, stream, wal_dir_rescan_delay); } diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 7ac56e27e37f43df2cc0790414f366fe6254d47e..72adb823124041270fdea168be47c11c7b0032e4 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -3507,7 +3507,8 @@ vy_squash_schedule(struct vy_lsm *lsm, struct vy_entry entry, void *arg) /* Start the upsert squashing fiber on demand. */ if (sq->fiber == NULL) { - sq->fiber = fiber_new("vinyl.squash_queue", vy_squash_queue_f); + sq->fiber = fiber_new_system("vinyl.squash_queue", + vy_squash_queue_f); if (sq->fiber == NULL) goto fail; fiber_start(sq->fiber, sq); diff --git a/src/box/vy_log.c b/src/box/vy_log.c index 46f8944f29e7642f9e30429e039549abb23e9bd9..7d2dada256a76da2cbc6da8c867da52b3b443882 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -767,8 +767,8 @@ vy_log_init(const char *dir) diag_create(&vy_log.tx_diag); wal_init_vy_log(); fiber_cond_create(&vy_log.flusher_cond); - vy_log.flusher = fiber_new("vinyl.vylog_flusher", - vy_log_flusher_f); + vy_log.flusher = fiber_new_system("vinyl.vylog_flusher", + vy_log_flusher_f); if (vy_log.flusher == NULL) panic("failed to allocate vylog flusher fiber"); fiber_wakeup(vy_log.flusher); diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 694415df2dcae6f960e9ba11d5f4d88b18f9dbd7..3582272734834131049432aa819d73bff1935b18 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -435,8 +435,8 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, scheduler->run_env = run_env; scheduler->quota = quota; - scheduler->scheduler_fiber = fiber_new("vinyl.scheduler", - vy_scheduler_f); + scheduler->scheduler_fiber = fiber_new_system("vinyl.scheduler", + vy_scheduler_f); if (scheduler->scheduler_fiber == NULL) panic("failed to allocate vinyl scheduler fiber"); diff --git a/src/box/watcher.c b/src/box/watcher.c index 40b35ce6293b3032afe5352ff71781510ec846d7..0ffe3649045d579abe68b1d0dc031bb244fe41c5 100644 --- a/src/box/watcher.c +++ b/src/box/watcher.c @@ -118,8 +118,8 @@ static void watchable_wakeup_worker(struct watchable *watchable) { if (watchable->worker == NULL) { - watchable->worker = fiber_new("box.watchable", - watchable_worker_f); + watchable->worker = fiber_new_system("box.watchable", + watchable_worker_f); if (watchable->worker == NULL) { diag_log(); panic("failed to start box.watchable worker fiber"); diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 3e252f50cac56b9f34ea1f4f62ccc94418b72827..e2f427084e87b1f7c8b08fd37feca8f3367b608f 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -1920,8 +1920,8 @@ swim_new(uint64_t generation) rlist_create(&swim->dissemination_queue); rlist_create(&swim->on_member_event); stailq_create(&swim->event_queue); - swim->event_handler = fiber_new("SWIM event handler", - swim_event_handler_f); + swim->event_handler = fiber_new_system("SWIM event handler", + swim_event_handler_f); if (swim->event_handler == NULL) { swim_delete(swim); return NULL; diff --git a/src/main.cc b/src/main.cc index cee030a4086915eaf0b43bf4856033599b2ed61a..daacf13f4e1bede7e0143332f26bb02916d7cf62 100644 --- a/src/main.cc +++ b/src/main.cc @@ -127,7 +127,7 @@ static void sig_checkpoint(ev_loop * /* loop */, struct ev_signal * /* w */, int /* revents */) { - struct fiber *f = fiber_new("checkpoint", sig_checkpoint_f); + struct fiber *f = fiber_new_system("checkpoint", sig_checkpoint_f); if (f == NULL) { say_warn("failed to allocate checkpoint fiber"); return; @@ -767,6 +767,7 @@ main(int argc, char **argv) */ struct fiber_attr attr; fiber_attr_create(&attr); + attr.flags |= FIBER_IS_SYSTEM; attr.flags &= ~FIBER_IS_CANCELLABLE; on_shutdown_fiber = fiber_new_ex("on_shutdown", &attr, diff --git a/test/app-luatest/system_fiber_test.lua b/test/app-luatest/system_fiber_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..6633411e56a30441df82dd0577e2ec029b691e33 --- /dev/null +++ b/test/app-luatest/system_fiber_test.lua @@ -0,0 +1,41 @@ +local server = require('test.luatest_helpers.server') +local g = require('luatest').group() + +g.before_all = function() + g.server = server:new{alias = 'default'} + g.server:start() +end + +g.after_all = function() + g.server:drop() +end + +g.test_system_fibers = function() + g.server:exec(function(uri) + local fiber = require('fiber') + local t = require('luatest') + + local conn = require('net.box').connect(uri) + local is_system = function(name) + return name:endswith('(net.box)') or + name == 'gc' or name == 'checkpoint_daemon' or + name:startswith('vinyl.') + end + + local system_fibers = {} + for fid, f in pairs(fiber.info()) do + if is_system(f.name) then + -- fiber.find() + fiber:cancel() == fiber.kill() + local fiber_temp = fiber.find(fid) + table.insert(system_fibers, fiber_temp) + fiber_temp:cancel() + end + end + + fiber.yield() + t.assert(conn:ping()) + for _, f in pairs(system_fibers) do + t.assert(f:status() ~= 'dead') + end + end, {g.server.net_box_uri}) +end