From b0c93713a5c6213bb3c35e501184d1469a782ce1 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 1 Dec 2022 17:38:52 +0300
Subject: [PATCH] refactor(node): split topology_request handlers

The code inside process_topology_request_async has become too complex
therefore TopologyRequest has lost its utility
---
 Cargo.lock                   |   1 +
 Cargo.toml                   |   1 +
 src/on_shutdown.rs           |   2 +-
 src/traft/mod.rs             |  20 -----
 src/traft/node.rs            | 155 +++++++++++++++++++----------------
 src/traft/rpc/expel.rs       |   4 +-
 src/traft/rpc/join.rs        |   2 +-
 src/traft/rpc/update_peer.rs |   2 +-
 8 files changed, 93 insertions(+), 94 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2032ece7e1..f6e301c7d8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -778,6 +778,7 @@ dependencies = [
  "clap",
  "cmake",
  "errno",
+ "futures",
  "indoc",
  "inventory",
  "itertools",
diff --git a/Cargo.toml b/Cargo.toml
index 62a76c735d..aecab13ff0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,6 +25,7 @@ itertools = "0.10.3"
 base64 = "0.13"
 lazy_static = "1.4"
 uuid = {version = "1.0", features = ["v3"]}
+futures = "0.3.25"
 
 [dependencies.protobuf]
 version = "2.27"
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index b2ae75a8e8..d8e41eab29 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -92,7 +92,7 @@ fn go_offline() -> traft::Result<()> {
         let wait_before_retry = Duration::from_millis(300);
 
         if leader_id == raft_id {
-            match node.handle_topology_request_and_wait(req.clone().into()) {
+            match node.handle_update_peer_request_and_wait(req.clone()) {
                 Err(Error::NotALeader) => {
                     // We've lost leadership while waiting for NodeImpl
                     // mutex. Retry after a small pause.
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index da3f149481..d8aa36e205 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -839,26 +839,6 @@ pub trait ContextCoercion: Serialize + DeserializeOwned {
     }
 }
 
-///////////////////////////////////////////////////////////////////////////////
-/// Request to change cluster topology.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum TopologyRequest {
-    Join(JoinRequest),
-    UpdatePeer(UpdatePeerRequest),
-}
-
-impl From<JoinRequest> for TopologyRequest {
-    fn from(j: JoinRequest) -> Self {
-        Self::Join(j)
-    }
-}
-
-impl From<UpdatePeerRequest> for TopologyRequest {
-    fn from(a: UpdatePeerRequest) -> Self {
-        Self::UpdatePeer(a)
-    }
-}
-
 ///////////////////////////////////////////////////////////////////////////////
 
 ::tarantool::define_str_enum! {
diff --git a/src/traft/node.rs b/src/traft/node.rs
index c89149d4c6..1f209dea03 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -60,7 +60,6 @@ use crate::traft::LogicalClock;
 use crate::traft::Op;
 use crate::traft::RaftSpaceAccess;
 use crate::traft::Topology;
-use crate::traft::TopologyRequest;
 use crate::traft::{JoinRequest, UpdatePeerRequest};
 
 use super::OpResult;
@@ -259,29 +258,42 @@ impl Node {
         })
     }
 
-    /// Processes the topology request and appends [`Op::PersistPeer`]
-    /// entry to the raft log (if successful).
+    /// Processes the [`JoinRequest`] request and appends necessary
+    /// entries to the raft log (if successful).
     ///
-    /// Returns the resulting peer when the entry is committed.
+    /// Returns the resulting [`Peer`] when the entry is committed.
     ///
     /// Returns an error if the callee node isn't a raft leader.
     ///
     /// **This function yields**
-    pub fn handle_topology_request_and_wait(
+    pub fn handle_join_request_and_wait(
         &self,
-        req: TopologyRequest,
+        req: JoinRequest,
     ) -> traft::Result<Box<traft::Peer>> {
-        let (notify_for_address, notify) =
-            self.raw_operation(|node_impl| node_impl.process_topology_request_async(req))?;
+        let (notify_addr, notify_peer) =
+            self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?;
         block_on(async {
-            if let Some(notify) = notify_for_address {
-                // FIXME: this error should be handled
-                let _ = notify.recv_any().await;
-            }
-            notify.recv().await.map(Box::new)
+            let (addr, peer) = futures::join!(notify_addr.recv_any(), notify_peer.recv());
+            addr?;
+            peer.map(Box::new)
         })
     }
 
+    /// Processes the [`UpdatePeerRequest`] request and appends
+    /// [`Op::PersistPeer`] entry to the raft log (if successful).
+    ///
+    /// Returns `Ok(())` when the entry is committed.
+    ///
+    /// Returns an error if the callee node isn't a raft leader.
+    ///
+    /// **This function yields**
+    pub fn handle_update_peer_request_and_wait(&self, req: UpdatePeerRequest) -> traft::Result<()> {
+        let notify =
+            self.raw_operation(|node_impl| node_impl.process_update_peer_request_async(req))?;
+        block_on(notify.recv_any())?;
+        Ok(())
+    }
+
     /// Only the conf_change_loop on a leader is eligible to call this function.
     ///
     /// **This function yields**
@@ -306,6 +318,7 @@ impl Node {
 
     /// This function **may yield** if `self.node_impl` mutex is acquired.
     #[inline]
+    #[track_caller]
     fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
         let mut node_impl = self.node_impl.lock();
         let res = f(&mut node_impl);
@@ -433,6 +446,8 @@ impl NodeImpl {
         Ok(notify)
     }
 
+    /// **Doesn't yield**
+    #[inline]
     pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
     where
         T: Into<traft::Op>,
@@ -463,58 +478,63 @@ impl NodeImpl {
         }
     }
 
-    /// Processes the topology request and appends [`Op::PersistPeer`]
-    /// entry to the raft log (if successful).
+    /// Processes the [`JoinRequest`] request and appends necessary entries
+    /// to the raft log (if successful).
     ///
     /// Returns an error if the callee node isn't a Raft leader.
     ///
-    /// **This function yields**
-    pub fn process_topology_request_async(
+    /// **This function doesn't yield**
+    pub fn process_join_request_async(
         &mut self,
-        req: TopologyRequest,
-    ) -> traft::Result<(Option<Notify>, Notify)> {
+        req: JoinRequest,
+    ) -> traft::Result<(Notify, Notify)> {
         let topology = self.topology_mut()?;
-        // FIXME: remove this once we introduce some 'async' stuff
-        let notify_for_address;
-        let peer = match req {
-            TopologyRequest::Join(JoinRequest {
-                instance_id,
-                replicaset_id,
-                advertise_address,
-                failure_domain,
-                ..
-            }) => {
-                let (peer, address) = topology
-                    .join(
-                        instance_id,
-                        replicaset_id,
-                        advertise_address,
-                        failure_domain,
-                    )
-                    .map_err(RaftError::ConfChangeError)?;
-                let peer_address = traft::PeerAddress {
-                    raft_id: peer.raft_id,
-                    address,
-                };
-                let op =
-                    OpDML::replace(ClusterwideSpace::Addresses, &peer_address).expect("can't fail");
-                let (lc, notify) = self.schedule_notification();
-                notify_for_address = Some(notify);
-                let ctx = traft::EntryContextNormal::new(lc, op);
-                // Important! Read bellow
-                self.raw_node.propose(ctx.to_bytes(), vec![])?;
-                peer
-            }
-            TopologyRequest::UpdatePeer(req) => {
-                notify_for_address = None;
-                topology
-                    .update_peer(req)
-                    .map_err(RaftError::ConfChangeError)?
-            }
+        let (peer, address) = topology
+            .join(
+                req.instance_id,
+                req.replicaset_id,
+                req.advertise_address,
+                req.failure_domain,
+            )
+            .map_err(RaftError::ConfChangeError)?;
+        let peer_address = traft::PeerAddress {
+            raft_id: peer.raft_id,
+            address,
         };
-        let (lc, notify) = self.schedule_notification();
-        let ctx = traft::EntryContextNormal::new(lc, OpPersistPeer::new(peer));
+        let op_addr =
+            OpDML::replace(ClusterwideSpace::Addresses, &peer_address).expect("can't fail");
+        let op_peer = OpPersistPeer::new(peer);
+        // Important! Calling `raw_node.propose()` may result in
+        // `ProposalDropped` error, but the topology has already been
+        // modified. The correct handling of this case should be the
+        // following.
+        //
+        // The `topology_cache` should be preserved. It won't be fully
+        // consistent anymore, but that's bearable. (TODO: examine how
+        // the particular requests are handled). At least it doesn't
+        // much differ from the case of overriding the entry due to a
+        // re-election.
+        //
+        // On the other hand, dropping topology_cache may be much more
+        // harmful. Loss of the uncommitted entries could result in
+        // assigning the same `raft_id` to a two different nodes.
+        Ok((self.propose_async(op_addr)?, self.propose_async(op_peer)?))
+    }
 
+    /// Processes the [`UpdatePeerRequest`] request and appends [`Op::PersistPeer`]
+    /// entry to the raft log (if successful).
+    ///
+    /// Returns an error if the callee node isn't a Raft leader.
+    ///
+    /// **This function doesn't yield**
+    pub fn process_update_peer_request_async(
+        &mut self,
+        req: UpdatePeerRequest,
+    ) -> traft::Result<Notify> {
+        let topology = self.topology_mut()?;
+        let peer = topology
+            .update_peer(req)
+            .map_err(RaftError::ConfChangeError)?;
         // Important! Calling `raw_node.propose()` may result in
         // `ProposalDropped` error, but the topology has already been
         // modified. The correct handling of this case should be the
@@ -530,9 +550,7 @@ impl NodeImpl {
         // harmful. Loss of the uncommitted entries could result in
         // assigning the same `raft_id` to a two different nodes.
         //
-        self.raw_node.propose(ctx.to_bytes(), vec![])?;
-
-        Ok((notify_for_address, notify))
+        Ok(self.propose_async(OpPersistPeer::new(peer))?)
     }
 
     fn propose_conf_change_async(
@@ -1118,7 +1136,7 @@ fn raft_conf_change_loop(
                 "current_grade" => %req.current_grade.expect("just set"),
                 "instance_id" => %req.instance_id,
             );
-            if let Err(e) = node.handle_topology_request_and_wait(req.into()) {
+            if let Err(e) = node.handle_update_peer_request_and_wait(req) {
                 tlog!(Warning,
                     "failed handling UpdatePeerRequest: {e}";
                     "instance_id" => %peer.instance_id,
@@ -1209,12 +1227,11 @@ fn raft_conf_change_loop(
                     .with_current_grade(CurrentGrade::raft_synced(peer.target_grade.incarnation));
                 global()
                     .expect("can't be deinitialized")
-                    .handle_topology_request_and_wait(req.into())
+                    .handle_update_peer_request_and_wait(req)
             });
             match res {
-                Ok(peer) => {
+                Ok(()) => {
                     tlog!(Info, "raft sync processed");
-                    debug_assert!(peer.current_grade == CurrentGradeVariant::RaftSynced);
                 }
                 Err(e) => {
                     tlog!(Warning, "raft sync failed: {e}";
@@ -1272,7 +1289,7 @@ fn raft_conf_change_loop(
 
                 let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                     .with_current_grade(CurrentGrade::replicated(peer.target_grade.incarnation));
-                node.handle_topology_request_and_wait(req.into())?;
+                node.handle_update_peer_request_and_wait(req)?;
 
                 Ok(())
             })();
@@ -1368,7 +1385,7 @@ fn raft_conf_change_loop(
                     .with_current_grade(CurrentGrade::sharding_initialized(
                         peer.target_grade.incarnation,
                     ));
-                node.handle_topology_request_and_wait(req.into())?;
+                node.handle_update_peer_request_and_wait(req)?;
 
                 if !vshard_bootstrapped {
                     // TODO: if this fails, it will only rerun next time vshard
@@ -1469,7 +1486,7 @@ fn raft_conf_change_loop(
 
                     let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                         .with_current_grade(CurrentGrade::online(peer.target_grade.incarnation));
-                    node.handle_topology_request_and_wait(req.into())?;
+                    node.handle_update_peer_request_and_wait(req)?;
                     Ok(())
                 })()
             } else {
@@ -1489,7 +1506,7 @@ fn raft_conf_change_loop(
                         let cluster_id = cluster_id.clone();
                         let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id)
                             .with_current_grade(CurrentGrade::online(target_grade.incarnation));
-                        node.handle_topology_request_and_wait(req.into())?;
+                        node.handle_update_peer_request_and_wait(req)?;
                         // TODO: change `Info` to `Debug`
                         tlog!(Info, "peer is online"; "instance_id" => &**instance_id);
                     }
diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs
index 3883176ecb..dc39dd37e9 100644
--- a/src/traft/rpc/expel.rs
+++ b/src/traft/rpc/expel.rs
@@ -22,9 +22,9 @@ crate::define_rpc_request! {
             return Err(Error::NotALeader);
         }
 
-        let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id)
+        let req = UpdatePeerRequest::new(req.instance_id, req.cluster_id)
             .with_target_grade(traft::TargetGradeVariant::Expelled);
-        node.handle_topology_request_and_wait(req2.into())?;
+        node.handle_update_peer_request_and_wait(req)?;
 
         Ok(Response {})
     }
diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs
index f6acb64d8d..063fcb8500 100644
--- a/src/traft/rpc/join.rs
+++ b/src/traft/rpc/join.rs
@@ -27,7 +27,7 @@ crate::define_rpc_request! {
             });
         }
 
-        match node.handle_topology_request_and_wait(req.into()) {
+        match node.handle_join_request_and_wait(req.into()) {
             Ok(peer) => {
                 let mut box_replication = vec![];
                 for replica in node.storage.peers.replicaset_peers(&peer.replicaset_id)? {
diff --git a/src/traft/rpc/update_peer.rs b/src/traft/rpc/update_peer.rs
index da3e763206..266b343d43 100644
--- a/src/traft/rpc/update_peer.rs
+++ b/src/traft/rpc/update_peer.rs
@@ -28,7 +28,7 @@ crate::define_rpc_request! {
                 "current_grade" => %current_grade,
             );
         }
-        match node.handle_topology_request_and_wait(req.into()) {
+        match node.handle_update_peer_request_and_wait(req) {
             Ok(_) => Ok(Response::Ok {}),
             Err(Error::NotALeader) => Ok(Response::ErrNotALeader),
             Err(e) => Err(e),
-- 
GitLab