From 9632c1ffd8a2740f729f5bbb90587900a3fafc7c Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 1 Dec 2022 17:38:52 +0300 Subject: [PATCH] refactor(node): split topology_request handlers The code inside process_topology_request_async has become too complex therefore TopologyRequest has lost its utility --- Cargo.lock | 1 + Cargo.toml | 1 + src/on_shutdown.rs | 2 +- src/traft/mod.rs | 20 ----- src/traft/node.rs | 155 +++++++++++++++++++---------------- src/traft/rpc/expel.rs | 4 +- src/traft/rpc/join.rs | 2 +- src/traft/rpc/update_peer.rs | 2 +- 8 files changed, 93 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2032ece7e1..f6e301c7d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,6 +778,7 @@ dependencies = [ "clap", "cmake", "errno", + "futures", "indoc", "inventory", "itertools", diff --git a/Cargo.toml b/Cargo.toml index 62a76c735d..aecab13ff0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ itertools = "0.10.3" base64 = "0.13" lazy_static = "1.4" uuid = {version = "1.0", features = ["v3"]} +futures = "0.3.25" [dependencies.protobuf] version = "2.27" diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index b2ae75a8e8..d8e41eab29 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -92,7 +92,7 @@ fn go_offline() -> traft::Result<()> { let wait_before_retry = Duration::from_millis(300); if leader_id == raft_id { - match node.handle_topology_request_and_wait(req.clone().into()) { + match node.handle_update_peer_request_and_wait(req.clone()) { Err(Error::NotALeader) => { // We've lost leadership while waiting for NodeImpl // mutex. Retry after a small pause. diff --git a/src/traft/mod.rs b/src/traft/mod.rs index da3f149481..d8aa36e205 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -839,26 +839,6 @@ pub trait ContextCoercion: Serialize + DeserializeOwned { } } -/////////////////////////////////////////////////////////////////////////////// -/// Request to change cluster topology. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum TopologyRequest { - Join(JoinRequest), - UpdatePeer(UpdatePeerRequest), -} - -impl From<JoinRequest> for TopologyRequest { - fn from(j: JoinRequest) -> Self { - Self::Join(j) - } -} - -impl From<UpdatePeerRequest> for TopologyRequest { - fn from(a: UpdatePeerRequest) -> Self { - Self::UpdatePeer(a) - } -} - /////////////////////////////////////////////////////////////////////////////// ::tarantool::define_str_enum! { diff --git a/src/traft/node.rs b/src/traft/node.rs index c89149d4c6..1f209dea03 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -60,7 +60,6 @@ use crate::traft::LogicalClock; use crate::traft::Op; use crate::traft::RaftSpaceAccess; use crate::traft::Topology; -use crate::traft::TopologyRequest; use crate::traft::{JoinRequest, UpdatePeerRequest}; use super::OpResult; @@ -259,29 +258,42 @@ impl Node { }) } - /// Processes the topology request and appends [`Op::PersistPeer`] - /// entry to the raft log (if successful). + /// Processes the [`JoinRequest`] request and appends necessary + /// entries to the raft log (if successful). /// - /// Returns the resulting peer when the entry is committed. + /// Returns the resulting [`Peer`] when the entry is committed. /// /// Returns an error if the callee node isn't a raft leader. /// /// **This function yields** - pub fn handle_topology_request_and_wait( + pub fn handle_join_request_and_wait( &self, - req: TopologyRequest, + req: JoinRequest, ) -> traft::Result<Box<traft::Peer>> { - let (notify_for_address, notify) = - self.raw_operation(|node_impl| node_impl.process_topology_request_async(req))?; + let (notify_addr, notify_peer) = + self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?; block_on(async { - if let Some(notify) = notify_for_address { - // FIXME: this error should be handled - let _ = notify.recv_any().await; - } - notify.recv().await.map(Box::new) + let (addr, peer) = futures::join!(notify_addr.recv_any(), notify_peer.recv()); + addr?; + peer.map(Box::new) }) } + /// Processes the [`UpdatePeerRequest`] request and appends + /// [`Op::PersistPeer`] entry to the raft log (if successful). + /// + /// Returns `Ok(())` when the entry is committed. + /// + /// Returns an error if the callee node isn't a raft leader. + /// + /// **This function yields** + pub fn handle_update_peer_request_and_wait(&self, req: UpdatePeerRequest) -> traft::Result<()> { + let notify = + self.raw_operation(|node_impl| node_impl.process_update_peer_request_async(req))?; + block_on(notify.recv_any())?; + Ok(()) + } + /// Only the conf_change_loop on a leader is eligible to call this function. /// /// **This function yields** @@ -306,6 +318,7 @@ impl Node { /// This function **may yield** if `self.node_impl` mutex is acquired. #[inline] + #[track_caller] fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R { let mut node_impl = self.node_impl.lock(); let res = f(&mut node_impl); @@ -433,6 +446,8 @@ impl NodeImpl { Ok(notify) } + /// **Doesn't yield** + #[inline] pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError> where T: Into<traft::Op>, @@ -463,58 +478,63 @@ impl NodeImpl { } } - /// Processes the topology request and appends [`Op::PersistPeer`] - /// entry to the raft log (if successful). + /// Processes the [`JoinRequest`] request and appends necessary entries + /// to the raft log (if successful). /// /// Returns an error if the callee node isn't a Raft leader. /// - /// **This function yields** - pub fn process_topology_request_async( + /// **This function doesn't yield** + pub fn process_join_request_async( &mut self, - req: TopologyRequest, - ) -> traft::Result<(Option<Notify>, Notify)> { + req: JoinRequest, + ) -> traft::Result<(Notify, Notify)> { let topology = self.topology_mut()?; - // FIXME: remove this once we introduce some 'async' stuff - let notify_for_address; - let peer = match req { - TopologyRequest::Join(JoinRequest { - instance_id, - replicaset_id, - advertise_address, - failure_domain, - .. - }) => { - let (peer, address) = topology - .join( - instance_id, - replicaset_id, - advertise_address, - failure_domain, - ) - .map_err(RaftError::ConfChangeError)?; - let peer_address = traft::PeerAddress { - raft_id: peer.raft_id, - address, - }; - let op = - OpDML::replace(ClusterwideSpace::Addresses, &peer_address).expect("can't fail"); - let (lc, notify) = self.schedule_notification(); - notify_for_address = Some(notify); - let ctx = traft::EntryContextNormal::new(lc, op); - // Important! Read bellow - self.raw_node.propose(ctx.to_bytes(), vec![])?; - peer - } - TopologyRequest::UpdatePeer(req) => { - notify_for_address = None; - topology - .update_peer(req) - .map_err(RaftError::ConfChangeError)? - } + let (peer, address) = topology + .join( + req.instance_id, + req.replicaset_id, + req.advertise_address, + req.failure_domain, + ) + .map_err(RaftError::ConfChangeError)?; + let peer_address = traft::PeerAddress { + raft_id: peer.raft_id, + address, }; - let (lc, notify) = self.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, OpPersistPeer::new(peer)); + let op_addr = + OpDML::replace(ClusterwideSpace::Addresses, &peer_address).expect("can't fail"); + let op_peer = OpPersistPeer::new(peer); + // Important! Calling `raw_node.propose()` may result in + // `ProposalDropped` error, but the topology has already been + // modified. The correct handling of this case should be the + // following. + // + // The `topology_cache` should be preserved. It won't be fully + // consistent anymore, but that's bearable. (TODO: examine how + // the particular requests are handled). At least it doesn't + // much differ from the case of overriding the entry due to a + // re-election. + // + // On the other hand, dropping topology_cache may be much more + // harmful. Loss of the uncommitted entries could result in + // assigning the same `raft_id` to a two different nodes. + Ok((self.propose_async(op_addr)?, self.propose_async(op_peer)?)) + } + /// Processes the [`UpdatePeerRequest`] request and appends [`Op::PersistPeer`] + /// entry to the raft log (if successful). + /// + /// Returns an error if the callee node isn't a Raft leader. + /// + /// **This function doesn't yield** + pub fn process_update_peer_request_async( + &mut self, + req: UpdatePeerRequest, + ) -> traft::Result<Notify> { + let topology = self.topology_mut()?; + let peer = topology + .update_peer(req) + .map_err(RaftError::ConfChangeError)?; // Important! Calling `raw_node.propose()` may result in // `ProposalDropped` error, but the topology has already been // modified. The correct handling of this case should be the @@ -530,9 +550,7 @@ impl NodeImpl { // harmful. Loss of the uncommitted entries could result in // assigning the same `raft_id` to a two different nodes. // - self.raw_node.propose(ctx.to_bytes(), vec![])?; - - Ok((notify_for_address, notify)) + Ok(self.propose_async(OpPersistPeer::new(peer))?) } fn propose_conf_change_async( @@ -1118,7 +1136,7 @@ fn raft_conf_change_loop( "current_grade" => %req.current_grade.expect("just set"), "instance_id" => %req.instance_id, ); - if let Err(e) = node.handle_topology_request_and_wait(req.into()) { + if let Err(e) = node.handle_update_peer_request_and_wait(req) { tlog!(Warning, "failed handling UpdatePeerRequest: {e}"; "instance_id" => %peer.instance_id, @@ -1209,12 +1227,11 @@ fn raft_conf_change_loop( .with_current_grade(CurrentGrade::raft_synced(peer.target_grade.incarnation)); global() .expect("can't be deinitialized") - .handle_topology_request_and_wait(req.into()) + .handle_update_peer_request_and_wait(req) }); match res { - Ok(peer) => { + Ok(()) => { tlog!(Info, "raft sync processed"); - debug_assert!(peer.current_grade == CurrentGradeVariant::RaftSynced); } Err(e) => { tlog!(Warning, "raft sync failed: {e}"; @@ -1272,7 +1289,7 @@ fn raft_conf_change_loop( let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::replicated(peer.target_grade.incarnation)); - node.handle_topology_request_and_wait(req.into())?; + node.handle_update_peer_request_and_wait(req)?; Ok(()) })(); @@ -1368,7 +1385,7 @@ fn raft_conf_change_loop( .with_current_grade(CurrentGrade::sharding_initialized( peer.target_grade.incarnation, )); - node.handle_topology_request_and_wait(req.into())?; + node.handle_update_peer_request_and_wait(req)?; if !vshard_bootstrapped { // TODO: if this fails, it will only rerun next time vshard @@ -1469,7 +1486,7 @@ fn raft_conf_change_loop( let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::online(peer.target_grade.incarnation)); - node.handle_topology_request_and_wait(req.into())?; + node.handle_update_peer_request_and_wait(req)?; Ok(()) })() } else { @@ -1489,7 +1506,7 @@ fn raft_conf_change_loop( let cluster_id = cluster_id.clone(); let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::online(target_grade.incarnation)); - node.handle_topology_request_and_wait(req.into())?; + node.handle_update_peer_request_and_wait(req)?; // TODO: change `Info` to `Debug` tlog!(Info, "peer is online"; "instance_id" => &**instance_id); } diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs index 3883176ecb..dc39dd37e9 100644 --- a/src/traft/rpc/expel.rs +++ b/src/traft/rpc/expel.rs @@ -22,9 +22,9 @@ crate::define_rpc_request! { return Err(Error::NotALeader); } - let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id) + let req = UpdatePeerRequest::new(req.instance_id, req.cluster_id) .with_target_grade(traft::TargetGradeVariant::Expelled); - node.handle_topology_request_and_wait(req2.into())?; + node.handle_update_peer_request_and_wait(req)?; Ok(Response {}) } diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs index f6acb64d8d..665aaf27e3 100644 --- a/src/traft/rpc/join.rs +++ b/src/traft/rpc/join.rs @@ -27,7 +27,7 @@ crate::define_rpc_request! { }); } - match node.handle_topology_request_and_wait(req.into()) { + match node.handle_join_request_and_wait(req) { Ok(peer) => { let mut box_replication = vec![]; for replica in node.storage.peers.replicaset_peers(&peer.replicaset_id)? { diff --git a/src/traft/rpc/update_peer.rs b/src/traft/rpc/update_peer.rs index da3e763206..266b343d43 100644 --- a/src/traft/rpc/update_peer.rs +++ b/src/traft/rpc/update_peer.rs @@ -28,7 +28,7 @@ crate::define_rpc_request! { "current_grade" => %current_grade, ); } - match node.handle_topology_request_and_wait(req.into()) { + match node.handle_update_peer_request_and_wait(req) { Ok(_) => Ok(Response::Ok {}), Err(Error::NotALeader) => Ok(Response::ErrNotALeader), Err(e) => Err(e), -- GitLab