From cd54b6c6274a24a972eaba777bb223277adc9387 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 5 Dec 2022 18:50:57 +0300 Subject: [PATCH] refactor(loop): r#loop::Loop::start -> crate::loop_start! Async callbacks support is very poor in rust, so using a macro is one of the few ways to have somewhat friendly code. --- src/loop.rs | 51 +++++++++++++++++++---------------------------- src/traft/node.rs | 14 +++++-------- 2 files changed, 26 insertions(+), 39 deletions(-) diff --git a/src/loop.rs b/src/loop.rs index 55477375e2..1b12b97212 100644 --- a/src/loop.rs +++ b/src/loop.rs @@ -1,37 +1,28 @@ -use tarantool::fiber; - -/// Fancy wrapper for tarantool fibers with a loop. -pub struct Loop(fiber::UnitJoinHandle<'static>); - pub enum FlowControl { Continue, Break, } -impl Loop { - pub fn start<A: 'static, S: 'static>( - name: impl Into<String>, - iter_fn: impl Fn(&A, &mut S) -> FlowControl + 'static, - args: A, - mut state: S, - ) -> Self { - #[allow(clippy::while_let_loop)] - let loop_fn = move || loop { - match iter_fn(&args, &mut state) { - FlowControl::Continue => continue, - FlowControl::Break => break, - }; - }; - let fiber = fiber::Builder::new() - .name(name) - .proc(loop_fn) +#[macro_export] +macro_rules! loop_start { + ($name:expr, $fn:expr, $args:expr, $state:expr $(,)?) => { + ::tarantool::fiber::Builder::new() + .name($name) + .proc(move || { + ::tarantool::fiber::block_on(async { + let args = $args; + let mut state = $state; + let iter_fn = $fn; + loop { + match iter_fn(&args, &mut state).await { + FlowControl::Continue => continue, + FlowControl::Break => break, + }; + } + }) + }) .start() - .unwrap(); - - Self(fiber) - } - - pub fn join(self) { - self.0.join() - } + .unwrap() + .into() + }; } diff --git a/src/traft/node.rs b/src/traft/node.rs index fdb5d59f04..0c3e574833 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -31,7 +31,8 @@ use std::time::Instant; use crate::governor::raft_conf_change; use crate::governor::waiting_migrations; use crate::kvcell::KVCell; -use crate::r#loop::{FlowControl, Loop}; +use crate::loop_start; +use crate::r#loop::FlowControl; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::stringify_cfunc; use crate::traft::rpc; @@ -890,7 +891,7 @@ impl NodeImpl { } struct MainLoop { - _loop: Option<Loop>, + _loop: Option<fiber::UnitJoinHandle<'static>>, loop_cond: Rc<Cond>, stop_flag: Rc<Cell<bool>>, } @@ -922,12 +923,7 @@ impl MainLoop { Self { // implicit yield - _loop: Some(Loop::start( - "raft_main_loop", - Self::iter_fn, - args, - initial_state, - )), + _loop: loop_start!("raft_main_loop", Self::iter_fn, args, initial_state), loop_cond, stop_flag, } @@ -937,7 +933,7 @@ impl MainLoop { self.loop_cond.broadcast(); } - fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl { + async fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl { state.loop_cond.wait_timeout(Self::TICK); // yields if state.stop_flag.take() { return FlowControl::Break; -- GitLab