diff --git a/src/traft/node.rs b/src/traft/node.rs index 0efe038ed5945be7b8d31b34d5c6f09169b92862..9389cc1ce220371b816ccf9d682d827cc2802b20 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -12,8 +12,9 @@ use ::raft::StorageError; use ::raft::INVALID_ID; use ::tarantool::error::{TarantoolError, TransactionError}; use ::tarantool::fiber; +use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::{oneshot, watch}; -use ::tarantool::fiber::{Cond, Mutex}; +use ::tarantool::fiber::Mutex; use ::tarantool::proc; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; @@ -885,7 +886,7 @@ impl NodeImpl { struct MainLoop { _loop: Option<fiber::UnitJoinHandle<'static>>, - loop_cond: Rc<Cond>, + loop_waker: watch::Sender<()>, stop_flag: Rc<Cell<bool>>, } @@ -895,7 +896,7 @@ struct MainLoopArgs { struct MainLoopState { next_tick: Instant, - loop_cond: Rc<Cond>, + loop_waker: watch::Receiver<()>, stop_flag: Rc<Cell<bool>>, } @@ -903,30 +904,30 @@ impl MainLoop { pub const TICK: Duration = Duration::from_millis(100); fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self { - let loop_cond: Rc<Cond> = Default::default(); + let (loop_waker_tx, loop_waker_rx) = watch::channel(()); let stop_flag: Rc<Cell<bool>> = Default::default(); let args = MainLoopArgs { node_impl }; let initial_state = MainLoopState { next_tick: Instant::now(), - loop_cond: loop_cond.clone(), + loop_waker: loop_waker_rx, stop_flag: stop_flag.clone(), }; Self { // implicit yield _loop: loop_start!("raft_main_loop", Self::iter_fn, args, initial_state), - loop_cond, + loop_waker: loop_waker_tx, stop_flag, } } pub fn wakeup(&self) { - self.loop_cond.broadcast(); + let _ = self.loop_waker.send(()); } async fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl { - state.loop_cond.wait_timeout(Self::TICK); // yields + let _ = state.loop_waker.changed().timeout(Self::TICK).await; if state.stop_flag.take() { return FlowControl::Break; } @@ -966,7 +967,7 @@ impl MainLoop { impl Drop for MainLoop { fn drop(&mut self) { self.stop_flag.set(true); - self.loop_cond.broadcast(); + let _ = self.loop_waker.send(()); self._loop.take().unwrap().join(); // yields } }