Skip to content
Snippets Groups Projects
Commit 30ebfe06 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: unify passing rust closures as fiber functions via fiber::Context

parent 8a34f411
No related branches found
No related tags found
1 merge request!444Gmoshkin/fix fiber detach
......@@ -14,6 +14,7 @@ use crate::error::{TarantoolError, TarantoolErrorCode};
use crate::ffi::has_fiber_id;
use crate::ffi::tarantool::fiber_sleep;
use crate::ffi::{lua, tarantool as ffi};
use crate::static_assert;
use crate::time::Instant;
use crate::tlua::{self as tlua, AsLua};
use crate::unwrap_ok_or;
......@@ -34,6 +35,7 @@ use std::cell::UnsafeCell;
use std::ffi::CString;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{align_of, size_of};
use std::os::raw::c_void;
use std::ptr::NonNull;
use std::time::Duration;
......@@ -626,10 +628,10 @@ where
ffi::fiber_new_ex(
cname.as_ptr(),
attr.inner,
Some(Self::trampoline_for_immediate),
Some(Self::trampoline_for_ffi::<false>),
)
} else {
ffi::fiber_new(cname.as_ptr(), Some(Self::trampoline_for_immediate))
ffi::fiber_new(cname.as_ptr(), Some(Self::trampoline_for_ffi::<false>))
}
};
......@@ -642,10 +644,13 @@ where
// Prepare the storage for rust closure & result value.
let result_cell = needs_returning::<T>().then(FiberResultCell::default);
let result_ptr = result_cell
.as_ref()
.map_or(std::ptr::null_mut(), |cell| cell.get());
let closure_ptr = Box::into_raw(Box::new(f));
// Prepare fiber context for passing fiber arguments.
let mut ctx = Box::<Context>::default();
if let Some(result_cell) = &result_cell {
ctx.fiber_result_ptr = result_cell.get() as _;
}
ctx.fiber_rust_closure = Box::into_raw(Box::new(f)) as _;
// Save the fiber id, if possible, before starting the fiber.
let mut id = None;
......@@ -653,7 +658,8 @@ where
id = Some(ffi::fiber_id(inner.as_ptr()));
}
ffi::fiber_start(inner.as_ptr(), closure_ptr, result_ptr);
// Cannot use fiber_set_ctx, because fiber_start will overwrite it.
ffi::fiber_start(inner.as_ptr(), Box::into_raw(ctx));
if is_joinable {
// At this point the fiber could have already finished execution
......@@ -671,28 +677,6 @@ where
}
}
unsafe extern "C" fn trampoline_for_immediate(mut args: VaList) -> i32 {
// Extract arugments from the va_list.
let f = Box::from_raw(args.get::<*const ()>() as *mut F);
let result_ptr = args.get::<*const ()>() as *mut Option<T>;
// On newer tarantool versions all fibers are cancellable.
// Let's do the same on older versions.
ffi::fiber_set_cancellable(true);
// Call `f` and drop the closure.
let t = f();
// Write results into the join handle if needed.
if needs_returning::<T>() {
assert!(!result_ptr.is_null());
std::ptr::write(result_ptr, Some(t));
} else if cfg!(debug_assertions) {
assert!(result_ptr.is_null());
}
0
}
/// Creates a fiber and schedules it for execution at some point later.
/// Does **NOT** yield.
///
......@@ -731,10 +715,10 @@ where
ffi::fiber_new_ex(
cname.as_ptr(),
attr.inner,
Some(Self::trampoline_for_deferred_ffi),
Some(Self::trampoline_for_ffi::<true>),
)
} else {
ffi::fiber_new(cname.as_ptr(), Some(Self::trampoline_for_deferred_ffi))
ffi::fiber_new(cname.as_ptr(), Some(Self::trampoline_for_ffi::<true>))
}
};
......@@ -747,10 +731,13 @@ where
// Prepare the storage for rust closure & result value.
let result_cell = needs_returning::<T>().then(FiberResultCell::default);
let result_ptr = result_cell
.as_ref()
.map_or(std::ptr::null_mut(), |cell| cell.get());
let ctx = Box::new(DeferredFiberContext { f, result_ptr });
// Prepare fiber context.
let mut ctx = Box::<Context>::default();
if let Some(result_cell) = &result_cell {
ctx.fiber_result_ptr = result_cell.get() as _;
}
ctx.fiber_rust_closure = Box::into_raw(Box::new(f)) as _;
ffi::fiber_set_ctx(inner.as_ptr(), Box::into_raw(ctx) as _);
ffi::fiber_wakeup(inner.as_ptr());
......@@ -773,43 +760,45 @@ where
}
}
unsafe extern "C" fn trampoline_for_deferred_ffi(_: VaList) -> i32 {
// Extract arugments from fiber context.
let fiber_self = ffi::fiber_self();
let ctx = ffi::fiber_get_ctx(fiber_self);
let ctx = Box::from_raw(ctx.cast::<DeferredFiberContext<F, T>>());
// Overwrite the context so that the callback doesn't mess it up somehow.
ffi::fiber_set_ctx(fiber_self, std::ptr::null_mut());
unsafe extern "C" fn trampoline_for_ffi<const VIA_CONTEXT: bool>(mut args: VaList) -> i32 {
// On newer tarantool versions all fibers are cancellable.
// Let's do the same on older versions.
ffi::fiber_set_cancellable(true);
let ctx;
if VIA_CONTEXT {
// Extract arguments from fiber context.
let fiber_self = ffi::fiber_self();
ctx = ffi::fiber_get_ctx(fiber_self).cast::<Context>();
} else {
// Extract arguments from the va_list.
ctx = args.get::<*const Context>() as _;
}
debug_assert!(context_is_valid(ctx));
let mut ctx = Box::from_raw(ctx);
// Remove the closure pointer from the context,
// so that nobody can mess it up somehow.
let f = std::mem::replace(&mut ctx.fiber_rust_closure, std::ptr::null_mut());
let f = Box::from_raw(f.cast::<F>());
// Call `f` and drop the closure.
let t = (ctx.f)();
let t = (f)();
// Write results into the join handle if needed.
if needs_returning::<T>() {
assert!(!ctx.result_ptr.is_null());
std::ptr::write(ctx.result_ptr, Some(t));
} else if cfg!(debug_assertions) {
assert!(ctx.result_ptr.is_null());
assert!(!ctx.fiber_result_ptr.is_null());
std::ptr::write(ctx.fiber_result_ptr.cast(), Some(t));
} else {
debug_assert!(ctx.fiber_result_ptr.is_null());
}
// The only thing this return value controls is wether the last error
// will be logged, which we don't care about.
0
}
}
struct DeferredFiberContext<F, T> {
f: F,
result_ptr: *mut Option<T>,
}
impl<'f, F, T> Fyber<F, T>
where
F: FnOnce() -> T + 'f,
T: 'f,
{
/// Creates a joinable **LUA** fiber and schedules it for execution at some
/// point later. Does **NOT** yield.
pub fn spawn_lua(
......@@ -2030,6 +2019,127 @@ impl Drop for LatchGuard {
}
}
////////////////////////////////////////////////////////////////////////////////
// Context
////////////////////////////////////////////////////////////////////////////////
/// Makes a best effort attempt to check if the given pointer actually points at
/// a valid instance of `Context` struct.
///
/// # Safety
/// If the pointer doesn't actually point at a `Context` struct this function
/// may crash or invoke undefined behaviour.
///
/// Unfortunately modern operating systems don't give us a good way to check if
/// a memory address is writable (other than some hacks with the `read` system
/// call, see <https://stackoverflow.com/a/14437277/3093427>).
/// So currently this is the best thing we can do.
#[inline]
pub unsafe fn context_is_valid(context: *mut Context) -> bool {
if context as usize == 0 {
return false;
}
if (context as usize) % CONTEXT_ALIGNMENT != 0 {
return false;
}
// This is our best effort to guard against someone overriding the fiber
// context by calling fiber_set_ctx. This should be enough to
// distinguish from something which is not a `fiber::Context` struct.
let magic_ptr = std::ptr::addr_of!((*context).magic);
if *magic_ptr != CONTEXT_MAGIC {
return false;
}
let size_ptr = std::ptr::addr_of!((*context).size);
if *size_ptr != CONTEXT_SIZE {
return false;
}
// This is should guard us against using context which was set from code
// which was compiled with a different version of tarantool-module,
// e.g. if there's multiple dynamic modules.
let version_ptr = std::ptr::addr_of!((*context).version);
if *version_ptr != CONTEXT_VERSION {
return false;
}
// There's still a small probability that it's invalid, but what are you going to do?
true
}
/// A random number to guard our fiber context from changes by someone else.
pub const CONTEXT_MAGIC: u64 = 0x69F1BE5C047E8769;
/// Size of the [`fiber::Context`] struct.
///
/// [`fiber::Context`]: Context
pub const CONTEXT_SIZE: u64 = size_of::<Context>() as _;
/// Alignment of the [`fiber::Context`] struct.
///
/// [`fiber::Context`]: Context
pub const CONTEXT_ALIGNMENT: usize = align_of::<Context>() as _;
static_assert!(CONTEXT_ALIGNMENT == 8, "this should never change");
/// Current version of the [`fiber::Context`] struct. This must be bumped every
/// time it's definition changes.
///
/// [`fiber::Context`]: Context
pub const CONTEXT_VERSION: u64 = 1;
#[repr(C)]
pub struct Context {
/// Special field for ffi-safety.
magic: u64,
/// Size of this struct.
///
/// Should always be equal to [`size_of`]`<Context>()`.
/// May not be so, if context was set from code compiled with a different
/// version of tarantool-module.
///
/// Useful for ffi interop.
size: u64,
/// Version number of this struct.
///
/// Should be always be equal to [`CONTEXT_VERSION`].
/// May not be so, if context was set from code compiled with a different
/// version of tarantool-module.
version: u64,
/// Special field used internally for implementation of deferred fibers.
fiber_rust_closure: *mut (),
/// Special field used internally for implementation of deferred fibers.
fiber_result_ptr: *mut (),
}
impl std::fmt::Debug for Context {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Context")
.field("magic", &self.magic)
.field("size", &self.size)
.field("version", &self.version)
.finish_non_exhaustive()
}
}
impl Default for Context {
#[inline(always)]
fn default() -> Self {
Self {
magic: CONTEXT_MAGIC,
size: CONTEXT_SIZE,
version: CONTEXT_VERSION,
fiber_rust_closure: std::ptr::null_mut(),
fiber_result_ptr: std::ptr::null_mut(),
}
}
}
////////////////////////////////////////////////////////////////////////////////
// misc
////////////////////////////////////////////////////////////////////////////////
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment