diff --git a/src/traft/node.rs b/src/traft/node.rs index 80cbc7f52cc06716dadaa1891df9a4aaab2c0a18..58a141fa06176848fea4366eba297e96b572eda6 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -78,7 +78,6 @@ pub struct Node { _conf_change_loop: fiber::UnitJoinHandle<'static>, status: Rc<RefCell<Status>>, raft_loop_cond: Rc<Cond>, - topology_cache: CachedCell<RaftTerm, Topology>, } impl std::fmt::Debug for Node { @@ -135,7 +134,6 @@ impl Node { .proc(conf_change_loop_fn) .start() .unwrap(), - topology_cache: CachedCell::new(), storage, }; @@ -165,29 +163,8 @@ impl Node { /// **This function yields** pub fn wait_for_read_state(&self, timeout: Duration) -> Result<RaftIndex, Error> { - self.raw_operation(|inner_node| { - // In some states `raft-rs` ignores the ReadIndex request. - // Check it preliminary, don't wait for the timeout. - // - // See for details: - // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058> - // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323> - let leader_doesnt_exist = inner_node.raw_node.raft.leader_id == INVALID_ID; - let term_just_started = inner_node.raw_node.raft.state == RaftStateRole::Leader - && !inner_node.raw_node.raft.commit_to_current_term(); - if leader_doesnt_exist || term_just_started { - return Err(RaftError::ProposalDropped.into()); - } - - let (lc, notify) = inner_node.schedule_notification(); - // read_index puts this context into an Entry, - // so we've got to compose full EntryContext, - // despite single LogicalClock would be enough - let ctx = traft::EntryContextNormal::new(lc, Op::Nop); - inner_node.raw_node.read_index(ctx.to_bytes()); - Ok(notify) - })? - .recv_timeout::<RaftIndex>(timeout) + let notify = self.raw_operation(|inner_node| inner_node.read_state_async())?; + notify.recv_timeout::<RaftIndex>(timeout) } /// Propose an operation and wait for it's result. @@ -197,20 +174,15 @@ impl Node { op: T, timeout: Duration, ) -> Result<T::Result, Error> { - self.raw_operation(|inner_node| { - let (lc, notify) = inner_node.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, op); - inner_node.raw_node.propose(ctx.to_bytes(), vec![])?; - Ok(notify) - })? - .recv_timeout::<T::Result>(timeout) + let notify = self.raw_operation(|inner_node| inner_node.propose_async(op))?; + notify.recv_timeout::<T::Result>(timeout) } /// Become a candidate and wait for a main loop round so that there's a /// chance we become the leader. /// **This function yields** pub fn campaign_and_yield(&self) -> Result<(), Error> { - self.raw_operation(|inner_node| inner_node.raw_node.campaign().map_err(Into::into))?; + self.raw_operation(|inner_node| inner_node.campaign())?; // Even though we don't expect a response, we still should let the // main_loop do an iteration. Without rescheduling, the Ready state // wouldn't be processed, the Status wouldn't be updated, and some @@ -221,18 +193,9 @@ impl Node { /// **This function yields** pub fn step_and_yield(&self, msg: raft::Message) { - self.raw_operation(|inner_node| { - if msg.to != inner_node.raw_node.raft.id { - return Ok(()); - } - // TODO check it's not a MsgPropose with op::PersistPeer. - // TODO check it's not a MsgPropose with ConfChange. - if let Err(e) = inner_node.raw_node.step(msg) { - tlog!(Error, "{e}"); - } - Ok(()) - }) - .ok(); + self.raw_operation(|inner_node| inner_node.step(msg)) + .map_err(|e| tlog!(Error, "{e}")) + .ok(); // even though we don't expect a response, we still should let the // main_loop do an iteration fiber::reschedule(); @@ -240,13 +203,7 @@ impl Node { /// **This function yields** pub fn tick_and_yield(&self, n_times: u32) { - self.raw_operation(|inner_node| { - for _ in 0..n_times { - inner_node.raw_node.tick(); - } - Ok(()) - }) - .ok(); + self.raw_operation(|inner_node| inner_node.tick(n_times)); // even though we don't expect a response, we still should let the // main_loop do an iteration fiber::reschedule(); @@ -273,51 +230,9 @@ impl Node { &self, req: TopologyRequest, ) -> Result<traft::Peer, Error> { - self.raw_operation(|inner_node| { - if inner_node.raw_node.raft.state != RaftStateRole::Leader { - return Err(RaftError::ConfChangeError("not a leader".into()).into()); - } - - let mut topology = self - .topology_cache - .pop(&inner_node.raw_node.raft.term) - .unwrap_or_else(|| { - let peers = Storage::peers().unwrap(); - let replication_factor = Storage::replication_factor().unwrap().unwrap(); - Topology::from_peers(peers).with_replication_factor(replication_factor) - }); - - let peer_result = match req { - TopologyRequest::Join(JoinRequest { - instance_id, - replicaset_id, - advertise_address, - failure_domain, - .. - }) => topology.join( - instance_id, - replicaset_id, - advertise_address, - failure_domain, - ), - TopologyRequest::UpdatePeer(req) => topology.update_peer(req), - }; - - let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => { - self.topology_cache.put(inner_node.raw_node.raft.term, topology); - return Err(RaftError::ConfChangeError(e).into()); - }); - - peer.commit_index = inner_node.raw_node.raft.raft_log.last_index() + 1; - - let (lc, notify) = inner_node.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer }); - inner_node.raw_node.propose(ctx.to_bytes(), vec![])?; - self.topology_cache - .put(inner_node.raw_node.raft.term, topology); - Ok(notify) - })? - .recv::<Peer>() + let notify = + self.raw_operation(|inner_node| inner_node.process_topology_request_async(req))?; + notify.recv::<Peer>() } /// Only the conf_change_loop on a leader is eligible to call this function. @@ -328,73 +243,14 @@ impl Node { term: RaftTerm, conf_change: raft::ConfChangeV2, ) -> Result<(), Error> { - self.raw_operation(|inner_node| { - // In some states proposing a ConfChange is impossible. - // Check if there's a reason to reject it. - - #[allow(clippy::never_loop)] - let reason: Option<&str> = loop { - // Checking leadership is only needed for the - // correct latch management. It doesn't affect - // raft correctness. Checking the instance is a - // leader makes sure the proposed `ConfChange` - // is appended to the raft log immediately - // instead of sending `MsgPropose` over the - // network. - if inner_node.raw_node.raft.state != RaftStateRole::Leader { - break Some("not a leader"); - } - - if term != inner_node.raw_node.raft.term { - break Some("raft term mismatch"); - } - - // Without this check the node would silently ignore the conf change. - // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026 - if inner_node.raw_node.raft.has_pending_conf() { - break Some("already has pending confchange"); - } - - break None; - }; - - if let Some(e) = reason { - return Err(RaftError::ConfChangeError(e.into()).into()); - } - - let prev_index = inner_node.raw_node.raft.raft_log.last_index(); - inner_node - .raw_node - .propose_conf_change(vec![], conf_change)?; - - // oops, current instance isn't actually a leader - // (which is impossible in theory, but we're not - // sure in practice) and sent the ConfChange message - // to the raft network instead of appending it to the - // raft log. - let last_index = inner_node.raw_node.raft.raft_log.last_index(); - assert_eq!(last_index, prev_index + 1); - - let (rx, tx) = Notify::new().into_clones(); - with_joint_state_latch(|joint_state_latch| { - assert!(joint_state_latch.take().is_none()); - event::broadcast(Event::JointStateEnter); - joint_state_latch.set(Some(JointStateLatch { - index: last_index, - notify: tx, - })); - }); - Ok(rx) - })? - .recv() + let notify = self + .raw_operation(|inner_node| inner_node.propose_conf_change_async(term, conf_change))?; + notify.recv() } /// This function **may yield** if `self.raw_node` is acquired. #[inline] - fn raw_operation<R>( - &self, - f: impl FnOnce(&mut InnerNode) -> Result<R, Error>, - ) -> Result<R, Error> { + fn raw_operation<R>(&self, f: impl FnOnce(&mut InnerNode) -> R) -> R { let mut inner_node = self.inner_node.lock(); let res = f(&mut *inner_node); drop(inner_node); @@ -411,6 +267,7 @@ impl Node { struct InnerNode { pub raw_node: RawNode, pub notifications: HashMap<LogicalClock, Notify>, + topology_cache: CachedCell<RaftTerm, Topology>, lc: LogicalClock, } @@ -435,6 +292,7 @@ impl InnerNode { Ok(Self { raw_node, notifications: Default::default(), + topology_cache: CachedCell::new(), lc: { let gen = storage.gen().unwrap().unwrap_or(0) + 1; storage.persist_gen(gen).unwrap(); @@ -447,6 +305,175 @@ impl InnerNode { self.raw_node.raft.id } + pub fn read_state_async(&mut self) -> Result<Notify, RaftError> { + // In some states `raft-rs` ignores the ReadIndex request. + // Check it preliminary, don't wait for the timeout. + // + // See for details: + // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058> + // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323> + let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID; + let term_just_started = // ... + self.raw_node.raft.state == RaftStateRole::Leader + && !self.raw_node.raft.commit_to_current_term(); + + if leader_doesnt_exist || term_just_started { + return Err(RaftError::ProposalDropped); + } + + let (lc, notify) = self.schedule_notification(); + // read_index puts this context into an Entry, + // so we've got to compose full EntryContext, + // despite single LogicalClock would be enough + let ctx = traft::EntryContextNormal::new(lc, Op::Nop); + self.raw_node.read_index(ctx.to_bytes()); + Ok(notify) + } + + pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError> + where + T: Into<traft::Op>, + { + let (lc, notify) = self.schedule_notification(); + let ctx = traft::EntryContextNormal::new(lc, op); + self.raw_node.propose(ctx.to_bytes(), vec![])?; + Ok(notify) + } + + pub fn campaign(&mut self) -> Result<(), RaftError> { + self.raw_node.campaign() + } + + pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> { + if msg.to != self.raft_id() { + return Ok(()); + } + + // TODO check it's not a MsgPropose with op::PersistPeer. + // TODO check it's not a MsgPropose with ConfChange. + self.raw_node.step(msg) + } + + pub fn tick(&mut self, n_times: u32) { + for _ in 0..n_times { + self.raw_node.tick(); + } + } + + /// Process the topology request and propose [`PersistPeer`] entry if + /// appropriate. + /// + /// Returns an error if the callee node isn't a Raft leader. + /// + /// **This function yields** + pub fn process_topology_request_async( + &mut self, + req: TopologyRequest, + ) -> Result<Notify, RaftError> { + if self.raw_node.raft.state != RaftStateRole::Leader { + return Err(RaftError::ConfChangeError("not a leader".into())); + } + + let mut topology = self + .topology_cache + .pop(&self.raw_node.raft.term) + .unwrap_or_else(|| { + let peers = Storage::peers().unwrap(); + let replication_factor = Storage::replication_factor().unwrap().unwrap(); + Topology::from_peers(peers).with_replication_factor(replication_factor) + }); + + let peer_result = match req { + TopologyRequest::Join(JoinRequest { + instance_id, + replicaset_id, + advertise_address, + failure_domain, + .. + }) => topology.join( + instance_id, + replicaset_id, + advertise_address, + failure_domain, + ), + TopologyRequest::UpdatePeer(req) => topology.update_peer(req), + }; + + let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => { + self.topology_cache.put(self.raw_node.raft.term, topology); + return Err(RaftError::ConfChangeError(e)); + }); + + peer.commit_index = self.raw_node.raft.raft_log.last_index() + 1; + + let (lc, notify) = self.schedule_notification(); + let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer }); + self.raw_node.propose(ctx.to_bytes(), vec![])?; + self.topology_cache.put(self.raw_node.raft.term, topology); + Ok(notify) + } + + fn propose_conf_change_async( + &mut self, + term: RaftTerm, + conf_change: raft::ConfChangeV2, + ) -> Result<Notify, RaftError> { + // In some states proposing a ConfChange is impossible. + // Check if there's a reason to reject it. + + #[allow(clippy::never_loop)] + let reason: Option<&str> = loop { + // Checking leadership is only needed for the + // correct latch management. It doesn't affect + // raft correctness. Checking the instance is a + // leader makes sure the proposed `ConfChange` + // is appended to the raft log immediately + // instead of sending `MsgPropose` over the + // network. + if self.raw_node.raft.state != RaftStateRole::Leader { + break Some("not a leader"); + } + + if term != self.raw_node.raft.term { + break Some("raft term mismatch"); + } + + // Without this check the node would silently ignore the conf change. + // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026 + if self.raw_node.raft.has_pending_conf() { + break Some("already has pending confchange"); + } + + break None; + }; + + if let Some(e) = reason { + return Err(RaftError::ConfChangeError(e.into())); + } + + let prev_index = self.raw_node.raft.raft_log.last_index(); + self.raw_node.propose_conf_change(vec![], conf_change)?; + let last_index = self.raw_node.raft.raft_log.last_index(); + + // oops, current instance isn't actually a leader + // (which is impossible in theory, but we're not + // sure in practice) and sent the ConfChange message + // to the raft network instead of appending it to the + // raft log. + assert_eq!(last_index, prev_index + 1); + + let (rx, tx) = Notify::new().into_clones(); + with_joint_state_latch(|joint_state_latch| { + assert!(joint_state_latch.take().is_none()); + event::broadcast(Event::JointStateEnter); + joint_state_latch.set(Some(JointStateLatch { + index: last_index, + notify: tx, + })); + }); + Ok(rx) + } + #[inline] fn cleanup_notifications(&mut self) { self.notifications