diff --git a/src/traft/mod.rs b/src/traft/mod.rs index fe4bbdf631624ccee5240c4d59f98ebf257b142a..5a626cd263f42ad1a335c4f029aaffe6bd50eaba 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -284,6 +284,13 @@ pub struct EntryContextNormal { pub op: Op, } +impl EntryContextNormal { + #[inline] + pub fn new(lc: LogicalClock, op: impl Into<Op>) -> Self { + Self { lc, op: op.into() } + } +} + /// [`EntryContext`] of a conf change entry, either `EntryConfChange` or `EntryConfChangeV2` #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct EntryContextConfChange { diff --git a/src/traft/node.rs b/src/traft/node.rs index f288ba3e7468bdd10b6f0c0b7419d7042acdabcc..5613dea7867eb8bccdf3ae72e0d80f6cac78e069 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -41,6 +41,7 @@ use crate::traft::failover; use crate::traft::notify::Notify; use crate::traft::ConnectionPool; use crate::traft::LogicalClock; +use crate::traft::Op; use crate::traft::Storage; use crate::traft::Topology; use crate::traft::TopologyRequest; @@ -168,22 +169,16 @@ impl Node { return Err(RaftError::ProposalDropped.into()); } - let (rx, tx) = Notify::new().into_clones(); - let lc = self.next_lc(); - self.notifications.borrow_mut().insert(lc, tx); + let (lc, notify) = self.add_notify(); // read_index puts this context into an Entry, // so we've got to compose full EntryContext, // despite single LogicalClock would be enough - let ctx = traft::EntryContextNormal { - lc, - op: traft::Op::Nop, - } - .to_bytes(); - raw_node.read_index(ctx); + let ctx = traft::EntryContextNormal::new(lc, Op::Nop); + raw_node.read_index(ctx.to_bytes()); drop(raw_node); event::broadcast(Event::RaftLoopNeeded); - rx.recv_timeout::<u64>(timeout) + notify.recv_timeout::<u64>(timeout) } pub fn propose<T: OpResult + Into<traft::Op>>( @@ -192,14 +187,12 @@ impl Node { timeout: Duration, ) -> Result<T::Result, Error> { let mut raw_node = self.raw_node.lock(); - let (rx, tx) = Notify::new().into_clones(); - let lc = self.next_lc(); - self.notifications.borrow_mut().insert(lc, tx); - let ctx = traft::EntryContextNormal { lc, op: op.into() }.to_bytes(); - raw_node.propose(ctx, vec![])?; + let (lc, notify) = self.add_notify(); + let ctx = traft::EntryContextNormal::new(lc, op); + raw_node.propose(ctx.to_bytes(), vec![])?; drop(raw_node); event::broadcast(Event::RaftLoopNeeded); - rx.recv_timeout::<T::Result>(timeout) + notify.recv_timeout::<T::Result>(timeout) } pub fn campaign(&self) -> Result<(), Error> { @@ -295,19 +288,13 @@ impl Node { peer.commit_index = raw_node.raft.raft_log.last_index() + 1; - let lc = self.next_lc(); - let ctx = traft::EntryContextNormal { - op: traft::Op::PersistPeer { peer }, - lc, - }; - - let (rx, tx) = Notify::new().into_clones(); - self.notifications.borrow_mut().insert(lc, tx); + let (lc, notify) = self.add_notify(); + let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer }); raw_node.propose(ctx.to_bytes(), vec![])?; self.topology_cache.put(raw_node.raft.term, topology); drop(raw_node); event::broadcast(Event::RaftLoopNeeded); - rx.recv::<Peer>() + notify.recv::<Peer>() } fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> { @@ -370,12 +357,21 @@ impl Node { rx.recv() } + #[inline] fn next_lc(&self) -> LogicalClock { - let mut lc = self.lc.take().expect("it's always Some"); + let mut lc = self.lc.get().expect("it's always Some"); lc.inc(); self.lc.set(Some(lc)); lc } + + #[inline] + fn add_notify(&self) -> (LogicalClock, Notify) { + let (rx, tx) = Notify::new().into_clones(); + let lc = self.next_lc(); + self.notifications.borrow_mut().insert(lc, tx); + (lc, rx) + } } #[derive(Debug)]