diff --git a/src/loop.rs b/src/loop.rs new file mode 100644 index 0000000000000000000000000000000000000000..55477375e26a48b3f239af23b737a8214a66accb --- /dev/null +++ b/src/loop.rs @@ -0,0 +1,37 @@ +use tarantool::fiber; + +/// Fancy wrapper for tarantool fibers with a loop. +pub struct Loop(fiber::UnitJoinHandle<'static>); + +pub enum FlowControl { + Continue, + Break, +} + +impl Loop { + pub fn start<A: 'static, S: 'static>( + name: impl Into<String>, + iter_fn: impl Fn(&A, &mut S) -> FlowControl + 'static, + args: A, + mut state: S, + ) -> Self { + #[allow(clippy::while_let_loop)] + let loop_fn = move || loop { + match iter_fn(&args, &mut state) { + FlowControl::Continue => continue, + FlowControl::Break => break, + }; + }; + let fiber = fiber::Builder::new() + .name(name) + .proc(loop_fn) + .start() + .unwrap(); + + Self(fiber) + } + + pub fn join(self) { + self.0.join() + } +} diff --git a/src/main.rs b/src/main.rs index 3c05e56de7071f1a68a03c9b13cb6d45d43312ad..98d2018123686d8bd7aa42456a43d1252e3f28b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,7 @@ mod args; mod discovery; mod ipc; mod kvcell; +mod r#loop; mod mailbox; mod tarantool; mod tlog; diff --git a/src/traft/node.rs b/src/traft/node.rs index 46c26b63356f955100b91e0af157836dce3c52b8..eb098f4e77fd476aec3eed3ecf4bdc28aebf803a 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -16,6 +16,7 @@ use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::proc; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; +use std::cell::Cell; use std::cell::RefCell; use std::collections::HashMap; use std::collections::HashSet; @@ -25,6 +26,7 @@ use std::time::Duration; use std::time::Instant; use crate::kvcell::KVCell; +use crate::r#loop::{FlowControl, Loop}; use crate::stringify_cfunc; use crate::traft::ContextCoercion as _; use crate::traft::InstanceId; @@ -85,10 +87,9 @@ pub struct Node { node_impl: Rc<Mutex<NodeImpl>>, pub(super) storage: RaftSpaceAccess, - _main_loop: fiber::UnitJoinHandle<'static>, + main_loop: MainLoop, _conf_change_loop: fiber::UnitJoinHandle<'static>, status: Rc<RefCell<Status>>, - raft_loop_cond: Rc<Cond>, } impl std::fmt::Debug for Node { @@ -100,8 +101,6 @@ impl std::fmt::Debug for Node { } impl Node { - pub const TICK: Duration = Duration::from_millis(100); - /// Initialize the raft node. /// **This function yields** pub fn new(storage: RaftSpaceAccess) -> Result<Self, RaftError> { @@ -116,14 +115,6 @@ impl Node { })); let node_impl = Rc::new(Mutex::new(node_impl)); - let raft_loop_cond = Rc::new(Cond::new()); - - let main_loop_fn = { - let status = status.clone(); - let node_impl = node_impl.clone(); - let raft_loop_cond = raft_loop_cond.clone(); - move || raft_main_loop(status, node_impl, raft_loop_cond) - }; let conf_change_loop_fn = { let status = status.clone(); @@ -133,20 +124,15 @@ impl Node { let node = Node { raft_id, - node_impl, - status, - raft_loop_cond, - _main_loop: fiber::Builder::new() - .name("raft_main_loop") - .proc(main_loop_fn) - .start() - .unwrap(), + main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields _conf_change_loop: fiber::Builder::new() .name("raft_conf_change_loop") .proc(conf_change_loop_fn) .start() .unwrap(), + node_impl, storage, + status, }; // Wait for the node to enter the main loop @@ -266,7 +252,7 @@ impl Node { let mut node_impl = self.node_impl.lock(); let res = f(&mut *node_impl); drop(node_impl); - self.raft_loop_cond.broadcast(); + self.main_loop.wakeup(); res } @@ -304,8 +290,8 @@ impl NodeImpl { let peer_storage = Storage::peers_access().clone(); let pool = ConnectionPool::builder(peer_storage) .handler_name(stringify_cfunc!(raft_interact)) - .call_timeout(Node::TICK * 4) - .connect_timeout(Node::TICK * 4) + .call_timeout(MainLoop::TICK * 4) + .connect_timeout(MainLoop::TICK * 4) .inactivity_timeout(Duration::from_secs(60)) .build(); @@ -795,28 +781,79 @@ impl NodeImpl { } } -fn raft_main_loop( +struct MainLoop { + _loop: Option<Loop>, + loop_cond: Rc<Cond>, + stop_flag: Rc<Cell<bool>>, +} + +struct MainLoopArgs { status: Rc<RefCell<Status>>, node_impl: Rc<Mutex<NodeImpl>>, - raft_loop_cond: Rc<Cond>, -) { - let mut next_tick = Instant::now() + Node::TICK; +} - loop { - raft_loop_cond.wait_timeout(Node::TICK); +struct MainLoopState { + next_tick: Instant, + loop_cond: Rc<Cond>, + stop_flag: Rc<Cell<bool>>, +} + +impl MainLoop { + pub const TICK: Duration = Duration::from_millis(100); + + fn start(status: Rc<RefCell<Status>>, node_impl: Rc<Mutex<NodeImpl>>) -> Self { + let loop_cond: Rc<Cond> = Default::default(); + let stop_flag: Rc<Cell<bool>> = Default::default(); + + let args = MainLoopArgs { status, node_impl }; + let initial_state = MainLoopState { + next_tick: Instant::now(), + loop_cond: loop_cond.clone(), + stop_flag: stop_flag.clone(), + }; + + Self { + // implicit yield + _loop: Some(Loop::start( + "raft_main_loop", + Self::iter_fn, + args, + initial_state, + )), + loop_cond, + stop_flag, + } + } + + pub fn wakeup(&self) { + self.loop_cond.broadcast(); + } + + fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl { + state.loop_cond.wait_timeout(Self::TICK); // yields + if state.stop_flag.take() { + return FlowControl::Break; + } + + let mut node_impl = args.node_impl.lock(); // yields + if state.stop_flag.take() { + return FlowControl::Break; + } - let mut node_impl = node_impl.lock(); node_impl.cleanup_notifications(); let now = Instant::now(); - if now > next_tick { - next_tick = now + Node::TICK; + if now > state.next_tick { + state.next_tick = now + Self::TICK; node_impl.raw_node.tick(); } let mut topology_changed = false; let mut expelled = false; - node_impl.advance(&status, &mut topology_changed, &mut expelled); + node_impl.advance(&args.status, &mut topology_changed, &mut expelled); // yields + if state.stop_flag.take() { + return FlowControl::Break; + } if expelled { crate::tarantool::exit(0); @@ -833,6 +870,16 @@ fn raft_main_loop( crate::tarantool::set_cfg(&box_cfg); } } + + FlowControl::Continue + } +} + +impl Drop for MainLoop { + fn drop(&mut self) { + self.stop_flag.set(true); + self.loop_cond.broadcast(); + self._loop.take().unwrap().join(); // yields } }