diff --git a/src/governor/mod.rs b/src/governor/mod.rs index cbca4c6ec9ab08496ff6f62f313e5c4d70f542e5..87298f317d21697617a4cd50a6a539540034a4f8 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -1,11 +1,11 @@ use std::borrow::Cow; -use std::cell::Cell; use std::collections::{HashMap, HashSet}; use std::iter::repeat; use std::rc::Rc; use std::time::Duration; use ::tarantool::fiber; +use ::tarantool::fiber::r#async::watch; use ::tarantool::space::UpdateOps; use ::tarantool::util::IntoClones as _; @@ -37,14 +37,13 @@ impl Loop { async fn iter_fn( Args { - status, storage, raft_storage, }: &Args, - State { pool }: &mut State, + State { status, pool }: &mut State, ) -> FlowControl { if !status.get().raft_state.is_leader() { - event::wait(Event::StatusChanged).expect("Events system must be initialized"); + status.changed().await.unwrap(); return Continue; } @@ -647,17 +646,17 @@ impl Loop { } pub fn start( - status: Rc<Cell<Status>>, + status: watch::Receiver<Status>, storage: Clusterwide, raft_storage: RaftSpaceAccess, ) -> Self { let args = Args { - status, storage, raft_storage, }; let state = State { + status, pool: ConnectionPool::builder(args.storage.clone()) .call_timeout(Duration::from_secs(1)) .connect_timeout(Duration::from_millis(500)) @@ -676,12 +675,12 @@ pub struct Loop { } struct Args { - status: Rc<Cell<Status>>, storage: Clusterwide, raft_storage: RaftSpaceAccess, } struct State { + status: watch::Receiver<Status>, pool: ConnectionPool, } diff --git a/src/traft/event.rs b/src/traft/event.rs index 98109cbb05e124fd47592ca9a05b9ae0a1b80579..1f73e941721386ba5252a50a916d2c6ab768a118 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -24,7 +24,6 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; JointStateEnter = "raft.joint-state-enter", JointStateLeave = "raft.joint-state-leave", JointStateDrop = "raft.joint-state-drop", - StatusChanged = "raft.status-changed", TopologyChanged = "raft.topology-changed", RaftLoopNeeded = "raft.loop-needed", RaftEntryApplied = "raft.entry-applied", diff --git a/src/traft/node.rs b/src/traft/node.rs index b5d75ed5b1c741b291bd096e2f98e8f4950b41ab..b15397d56f59de7c0949545099f849ba6c5e9c88 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -12,7 +12,7 @@ use ::raft::StorageError; use ::raft::INVALID_ID; use ::tarantool::error::{TarantoolError, TransactionError}; use ::tarantool::fiber; -use ::tarantool::fiber::r#async::oneshot; +use ::tarantool::fiber::r#async::{oneshot, watch}; use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::proc; use ::tarantool::tlua; @@ -122,7 +122,7 @@ pub struct Node { pub(crate) raft_storage: RaftSpaceAccess, main_loop: MainLoop, _governor_loop: governor::Loop, - status: Rc<Cell<Status>>, + status: watch::Receiver<Status>, } impl std::fmt::Debug for Node { @@ -140,18 +140,13 @@ impl Node { let node_impl = NodeImpl::new(storage.clone(), raft_storage.clone())?; let raft_id = node_impl.raft_id(); - let status = Rc::new(Cell::new(Status { - id: raft_id, - leader_id: None, - term: traft::INIT_RAFT_TERM, - raft_state: RaftState::Follower, - })); + let status = node_impl.status.subscribe(); let node_impl = Rc::new(Mutex::new(node_impl)); let node = Node { raft_id, - main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields + main_loop: MainLoop::start(node_impl.clone()), // yields _governor_loop: governor::Loop::start( status.clone(), storage.clone(), @@ -179,7 +174,7 @@ impl Node { /// Wait for the status to be changed. /// **This function yields** pub fn wait_status(&self) { - event::wait(Event::StatusChanged).expect("Events system wasn't initialized"); + fiber::block_on(self.status.clone().changed()).unwrap(); } /// **This function yields** @@ -328,6 +323,7 @@ struct NodeImpl { raft_storage: RaftSpaceAccess, pool: ConnectionPool, lc: LogicalClock, + status: watch::Sender<Status>, } impl NodeImpl { @@ -358,6 +354,13 @@ impl NodeImpl { let raw_node = RawNode::new(&cfg, raft_storage.clone(), &tlog::root())?; + let (status, _) = watch::channel(Status { + id: raft_id, + leader_id: None, + term: traft::INIT_RAFT_TERM, + raft_state: RaftState::Follower, + }); + Ok(Self { raw_node, notifications: Default::default(), @@ -367,6 +370,7 @@ impl NodeImpl { raft_storage, pool, lc, + status, }) } @@ -775,7 +779,7 @@ impl NodeImpl { /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49> /// /// This function yields. - fn advance(&mut self, status: &Cell<Status>, topology_changed: &mut bool, expelled: &mut bool) { + fn advance(&mut self, topology_changed: &mut bool, expelled: &mut bool) { // Get the `Ready` with `RawNode::ready` interface. if !self.raw_node.has_ready() { return; @@ -792,11 +796,15 @@ impl NodeImpl { } if let Some(ss) = ready.ss() { - let mut s = status.get(); - s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id); - s.raft_state = ss.raft_state.into(); - status.set(s); - event::broadcast(Event::StatusChanged); + if let Err(e) = self.status.send_modify(|s| { + s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id); + s.raft_state = ss.raft_state.into(); + }) { + tlog!(Warning, "failed updating node status: {e}"; + "leader_id" => ss.leader_id, + "raft_state" => ?ss.raft_state, + ) + } } self.handle_read_states(ready.read_states()); @@ -811,10 +819,9 @@ impl NodeImpl { // Raft HardState changed, and we need to persist it. if let Some(hs) = ready.hs() { self.raft_storage.persist_hard_state(hs).unwrap(); - - let mut s = status.get(); - s.term = hs.term; - status.set(s); + if let Err(e) = self.status.send_modify(|s| s.term = hs.term) { + tlog!(Warning, "failed updating current term: {e}"; "term" => hs.term) + } } Ok(()) @@ -881,7 +888,6 @@ struct MainLoop { } struct MainLoopArgs { - status: Rc<Cell<Status>>, node_impl: Rc<Mutex<NodeImpl>>, } @@ -894,11 +900,11 @@ struct MainLoopState { impl MainLoop { pub const TICK: Duration = Duration::from_millis(100); - fn start(status: Rc<Cell<Status>>, node_impl: Rc<Mutex<NodeImpl>>) -> Self { + fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self { let loop_cond: Rc<Cond> = Default::default(); let stop_flag: Rc<Cell<bool>> = Default::default(); - let args = MainLoopArgs { status, node_impl }; + let args = MainLoopArgs { node_impl }; let initial_state = MainLoopState { next_tick: Instant::now(), loop_cond: loop_cond.clone(), @@ -938,7 +944,7 @@ impl MainLoop { let mut topology_changed = false; let mut expelled = false; - node_impl.advance(&args.status, &mut topology_changed, &mut expelled); // yields + node_impl.advance(&mut topology_changed, &mut expelled); // yields if state.stop_flag.take() { return FlowControl::Break; }