diff --git a/src/traft/node.rs b/src/traft/node.rs index ff8bd9a7f7b93c77d19e9609b677a35bc4fd4136..aca3cbc43521208433d1cabb3106a6d37c01bf94 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -11,7 +11,7 @@ use ::raft::StateRole as RaftStateRole; use ::raft::INVALID_ID; use ::tarantool::error::TransactionError; use ::tarantool::fiber; -use ::tarantool::fiber::Mutex; +use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::proc; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; @@ -73,6 +73,7 @@ pub struct Node { _main_loop: fiber::UnitJoinHandle<'static>, _conf_change_loop: fiber::UnitJoinHandle<'static>, status: Rc<RefCell<Status>>, + raft_loop_cond: Rc<Cond>, notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, topology_cache: CachedCell<RaftTerm, Topology>, lc: Cell<Option<LogicalClock>>, @@ -100,14 +101,15 @@ impl Node { raft_state: "Follower".into(), is_ready: false, })); - + let raft_loop_cond = Rc::new(Cond::new()); let notifications = Rc::new(RefCell::new(HashMap::new())); let main_loop_fn = { let status = status.clone(); let raw_node = raw_node.clone(); + let raft_loop_cond = raft_loop_cond.clone(); let notifications = notifications.clone(); - move || raft_main_loop(status, raw_node, notifications) + move || raft_main_loop(status, raw_node, raft_loop_cond, notifications) }; let conf_change_loop_fn = { @@ -120,6 +122,7 @@ impl Node { raft_id: cfg.id, notifications, status, + raft_loop_cond, _main_loop: fiber::Builder::new() .name("raft_main_loop") .proc(main_loop_fn) @@ -395,7 +398,7 @@ impl Node { let mut raw_node = self.raw_node.lock(); let res = f(&mut *raw_node); drop(raw_node); - event::broadcast(Event::RaftLoopNeeded); + self.raft_loop_cond.broadcast(); res } @@ -605,6 +608,7 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { fn raft_main_loop( status: Rc<RefCell<Status>>, raw_node: Rc<Mutex<RawNode>>, + raft_loop_cond: Rc<Cond>, notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, ) { let mut next_tick = Instant::now() + Node::TICK; @@ -625,7 +629,7 @@ fn raft_main_loop( .borrow_mut() .retain(|_, notify: &mut Notify| !notify.is_closed()); - event::wait_timeout(Event::RaftLoopNeeded, Node::TICK).expect("Events must be initialized"); + raft_loop_cond.wait_timeout(Node::TICK); let mut raw_node = raw_node.lock(); let now = Instant::now();