Skip to content
Snippets Groups Projects
Commit 95b423ce authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor(node): use watch instead of cond for waking up MainLoop

parent 728a526b
No related branches found
No related tags found
1 merge request!421refactor(node): use watch instead of cond for waking up MainLoop
Pipeline #14210 passed
...@@ -12,8 +12,9 @@ use ::raft::StorageError; ...@@ -12,8 +12,9 @@ use ::raft::StorageError;
use ::raft::INVALID_ID; use ::raft::INVALID_ID;
use ::tarantool::error::{TarantoolError, TransactionError}; use ::tarantool::error::{TarantoolError, TransactionError};
use ::tarantool::fiber; use ::tarantool::fiber;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch}; use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::fiber::Mutex;
use ::tarantool::proc; use ::tarantool::proc;
use ::tarantool::tlua; use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction; use ::tarantool::transaction::start_transaction;
...@@ -885,7 +886,7 @@ impl NodeImpl { ...@@ -885,7 +886,7 @@ impl NodeImpl {
struct MainLoop { struct MainLoop {
_loop: Option<fiber::UnitJoinHandle<'static>>, _loop: Option<fiber::UnitJoinHandle<'static>>,
loop_cond: Rc<Cond>, loop_waker: watch::Sender<()>,
stop_flag: Rc<Cell<bool>>, stop_flag: Rc<Cell<bool>>,
} }
...@@ -895,7 +896,7 @@ struct MainLoopArgs { ...@@ -895,7 +896,7 @@ struct MainLoopArgs {
struct MainLoopState { struct MainLoopState {
next_tick: Instant, next_tick: Instant,
loop_cond: Rc<Cond>, loop_waker: watch::Receiver<()>,
stop_flag: Rc<Cell<bool>>, stop_flag: Rc<Cell<bool>>,
} }
...@@ -903,30 +904,30 @@ impl MainLoop { ...@@ -903,30 +904,30 @@ impl MainLoop {
pub const TICK: Duration = Duration::from_millis(100); pub const TICK: Duration = Duration::from_millis(100);
fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self { fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
let loop_cond: Rc<Cond> = Default::default(); let (loop_waker_tx, loop_waker_rx) = watch::channel(());
let stop_flag: Rc<Cell<bool>> = Default::default(); let stop_flag: Rc<Cell<bool>> = Default::default();
let args = MainLoopArgs { node_impl }; let args = MainLoopArgs { node_impl };
let initial_state = MainLoopState { let initial_state = MainLoopState {
next_tick: Instant::now(), next_tick: Instant::now(),
loop_cond: loop_cond.clone(), loop_waker: loop_waker_rx,
stop_flag: stop_flag.clone(), stop_flag: stop_flag.clone(),
}; };
Self { Self {
// implicit yield // implicit yield
_loop: loop_start!("raft_main_loop", Self::iter_fn, args, initial_state), _loop: loop_start!("raft_main_loop", Self::iter_fn, args, initial_state),
loop_cond, loop_waker: loop_waker_tx,
stop_flag, stop_flag,
} }
} }
pub fn wakeup(&self) { pub fn wakeup(&self) {
self.loop_cond.broadcast(); let _ = self.loop_waker.send(());
} }
async fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl { async fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl {
state.loop_cond.wait_timeout(Self::TICK); // yields let _ = state.loop_waker.changed().timeout(Self::TICK).await;
if state.stop_flag.take() { if state.stop_flag.take() {
return FlowControl::Break; return FlowControl::Break;
} }
...@@ -966,7 +967,7 @@ impl MainLoop { ...@@ -966,7 +967,7 @@ impl MainLoop {
impl Drop for MainLoop { impl Drop for MainLoop {
fn drop(&mut self) { fn drop(&mut self) {
self.stop_flag.set(true); self.stop_flag.set(true);
self.loop_cond.broadcast(); let _ = self.loop_waker.send(());
self._loop.take().unwrap().join(); // yields self._loop.take().unwrap().join(); // yields
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment