Skip to content
Snippets Groups Projects
Commit 8443bd93 authored by Vladislav Shpilevoy's avatar Vladislav Shpilevoy Committed by Kirill Yukhin
Browse files

fiber: introduce schedule_task() internal function

fiber._internal.schedule_task() is an API for a singleton fiber
worker object. It serves for not urgent delayed execution of
functions. Main purpose - schedule execution of a function, which
is going to yield, from a context, where a yield is not allowed.
Such as an FFI object's GC callback.

It will be used by SWIM and by fio, whose destruction yields, but
they need to use GC finalizer, where a yield is not allowed.

Part of #4727
parent a41bef3b
No related branches found
No related tags found
No related merge requests found
...@@ -828,6 +828,19 @@ lbox_fiber_set_joinable(struct lua_State *L) ...@@ -828,6 +828,19 @@ lbox_fiber_set_joinable(struct lua_State *L)
return 0; return 0;
} }
/**
* Alternative to fiber.sleep(infinite) which does not participate
* in an event loop at all until an explicit wakeup. This is less
* overhead. Useful for fibers sleeping most of the time.
*/
static int
lbox_fiber_stall(struct lua_State *L)
{
(void) L;
fiber_yield();
return 0;
}
static const struct luaL_Reg lbox_fiber_meta [] = { static const struct luaL_Reg lbox_fiber_meta [] = {
{"id", lbox_fiber_id}, {"id", lbox_fiber_id},
{"name", lbox_fiber_name}, {"name", lbox_fiber_name},
...@@ -865,6 +878,8 @@ static const struct luaL_Reg fiberlib[] = { ...@@ -865,6 +878,8 @@ static const struct luaL_Reg fiberlib[] = {
{"new", lbox_fiber_new}, {"new", lbox_fiber_new},
{"status", lbox_fiber_status}, {"status", lbox_fiber_status},
{"name", lbox_fiber_name}, {"name", lbox_fiber_name},
/* Internal functions, to hide in fiber.lua. */
{"stall", lbox_fiber_stall},
{NULL, NULL} {NULL, NULL}
}; };
......
...@@ -34,4 +34,65 @@ fiber.time = fiber_time ...@@ -34,4 +34,65 @@ fiber.time = fiber_time
fiber.time64 = fiber_time64 fiber.time64 = fiber_time64
fiber.clock = fiber_clock fiber.clock = fiber_clock
fiber.clock64 = fiber_clock64 fiber.clock64 = fiber_clock64
local stall = fiber.stall
fiber.stall = nil
local worker_next_task = nil
local worker_last_task = nil
local worker_fiber = nil
--
-- Worker is a singleton fiber for not urgent delayed execution of
-- functions. Main purpose - schedule execution of a function,
-- which is going to yield, from a context, where a yield is not
-- allowed. Such as an FFI object's GC callback.
--
local function worker_f()
local task
while true do
while true do
task = worker_next_task
if task then
break
end
stall()
end
worker_next_task = task.next
task.f(task.arg)
fiber.sleep(0)
end
end
local function worker_safe_f()
pcall(worker_f)
-- Worker_f never returns. If the execution is here, this
-- fiber is probably canceled and now is not able to sleep.
-- Create a new one.
worker_fiber = fiber.new(worker_safe_f)
end
worker_fiber = fiber.new(worker_safe_f)
local function worker_schedule_task(f, arg)
local task = {f = f, arg = arg}
if not worker_next_task then
worker_next_task = task
else
worker_last_task.next = task
end
worker_last_task = task
worker_fiber:wakeup()
end
-- Start from '_' to hide it from auto completion.
fiber._internal = fiber._internal or {}
fiber._internal.schedule_task = worker_schedule_task
setmetatable(fiber, {__serialize = function(self)
local res = table.copy(self)
res._internal = nil
return setmetatable(res, {})
end})
return fiber return fiber
...@@ -1561,6 +1561,78 @@ fiber.top() ...@@ -1561,6 +1561,78 @@ fiber.top()
--- ---
- error: fiber.top() is disabled. Enable it with fiber.top_enable() first - error: fiber.top() is disabled. Enable it with fiber.top_enable() first
... ...
--
-- fiber._internal.schedule_task() - API for internal usage for
-- delayed execution of a function.
--
glob_arg = {}
---
...
count = 0
---
...
function task_f(arg) \
count = count + 1 \
table.insert(glob_arg, arg) \
arg = arg + 1 \
if arg <= 3 then \
fiber._internal.schedule_task(task_f, arg) \
else \
fiber.self():cancel() \
error('Worker is broken') \
end \
end
---
...
for i = 1, 3 do \
local csw1 = fiber.info()[fiber.id()].csw \
fiber._internal.schedule_task(task_f, i) \
local csw2 = fiber.info()[fiber.id()].csw \
assert(csw1 == csw2 and type(csw1) == 'number') \
end
---
...
old_count = count
---
...
test_run:wait_cond(function() \
fiber.yield() \
if count == old_count then \
return true \
end \
old_count = count \
end)
---
- true
...
glob_arg
---
- - 1
- 2
- 3
- 2
- 3
- 3
...
count
---
- 6
...
-- Ensure, that after all tasks are finished, the worker didn't
-- stuck somewhere.
glob_arg = nil
---
...
fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100)
---
...
fiber.yield()
---
...
glob_arg
---
- 100
...
-- cleanup -- cleanup
test_run:cmd("clear filter") test_run:cmd("clear filter")
--- ---
......
...@@ -688,6 +688,47 @@ tbl.time > 0 ...@@ -688,6 +688,47 @@ tbl.time > 0
fiber.top_disable() fiber.top_disable()
fiber.top() fiber.top()
--
-- fiber._internal.schedule_task() - API for internal usage for
-- delayed execution of a function.
--
glob_arg = {}
count = 0
function task_f(arg) \
count = count + 1 \
table.insert(glob_arg, arg) \
arg = arg + 1 \
if arg <= 3 then \
fiber._internal.schedule_task(task_f, arg) \
else \
fiber.self():cancel() \
error('Worker is broken') \
end \
end
for i = 1, 3 do \
local csw1 = fiber.info()[fiber.id()].csw \
fiber._internal.schedule_task(task_f, i) \
local csw2 = fiber.info()[fiber.id()].csw \
assert(csw1 == csw2 and type(csw1) == 'number') \
end
old_count = count
test_run:wait_cond(function() \
fiber.yield() \
if count == old_count then \
return true \
end \
old_count = count \
end)
glob_arg
count
-- Ensure, that after all tasks are finished, the worker didn't
-- stuck somewhere.
glob_arg = nil
fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100)
fiber.yield()
glob_arg
-- cleanup -- cleanup
test_run:cmd("clear filter") test_run:cmd("clear filter")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment