From cc59821fca30f655cc5505c379bc22b057c49fcd Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 1 Dec 2022 17:38:52 +0300
Subject: [PATCH] refactor(node): split topology_request handlers

The code inside process_topology_request_async has become too complex
therefore TopologyRequest has lost its utility
---
 Cargo.lock                       |   1 +
 Cargo.toml                       |   1 +
 src/on_shutdown.rs               |   2 +-
 src/traft/mod.rs                 |  20 ----
 src/traft/node.rs                | 160 ++++++++++++++++++-------------
 src/traft/rpc/expel.rs           |   4 +-
 src/traft/rpc/join.rs            |   2 +-
 src/traft/rpc/update_instance.rs |   2 +-
 8 files changed, 98 insertions(+), 94 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index cceedde7cb..db994cda3c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -798,6 +798,7 @@ dependencies = [
  "clap",
  "cmake",
  "errno",
+ "futures",
  "indoc",
  "inventory",
  "itertools",
diff --git a/Cargo.toml b/Cargo.toml
index 8f91dc4c90..1a0ade1db8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -26,6 +26,7 @@ base64 = "0.13"
 lazy_static = "1.4"
 uuid = {version = "1.0", features = ["v3"]}
 linkme = "0.2.10"
+futures = "0.3.25"
 
 [dependencies.protobuf]
 version = "2.27"
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index 85c24e4d10..ad52e693e8 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -92,7 +92,7 @@ fn go_offline() -> traft::Result<()> {
         let wait_before_retry = Duration::from_millis(300);
 
         if leader_id == raft_id {
-            match node.handle_topology_request_and_wait(req.clone().into()) {
+            match node.handle_update_instance_request_and_wait(req.clone()) {
                 Err(Error::NotALeader) => {
                     // We've lost leadership while waiting for NodeImpl
                     // mutex. Retry after a small pause.
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 64c18c2230..4369e00399 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -836,26 +836,6 @@ pub trait ContextCoercion: Serialize + DeserializeOwned {
     }
 }
 
-///////////////////////////////////////////////////////////////////////////////
-/// Request to change cluster topology.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum TopologyRequest {
-    Join(join::Request),
-    UpdateInstance(update_instance::Request),
-}
-
-impl From<join::Request> for TopologyRequest {
-    fn from(j: join::Request) -> Self {
-        Self::Join(j)
-    }
-}
-
-impl From<update_instance::Request> for TopologyRequest {
-    fn from(a: update_instance::Request) -> Self {
-        Self::UpdateInstance(a)
-    }
-}
-
 ///////////////////////////////////////////////////////////////////////////////
 
 ::tarantool::define_str_enum! {
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 5293017a2c..2024c0b041 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -60,7 +60,6 @@ use crate::traft::LogicalClock;
 use crate::traft::Op;
 use crate::traft::RaftSpaceAccess;
 use crate::traft::Topology;
-use crate::traft::TopologyRequest;
 
 use super::OpResult;
 use super::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant};
@@ -258,29 +257,45 @@ impl Node {
         })
     }
 
-    /// Processes the topology request and appends [`Op::PersistInstance`]
-    /// entry to the raft log (if successful).
+    /// Processes the [`join::Request`] request and appends necessary
+    /// entries to the raft log (if successful).
     ///
-    /// Returns the resulting instance when the entry is committed.
+    /// Returns the resulting [`Instance`] when the entry is committed.
     ///
     /// Returns an error if the callee node isn't a raft leader.
     ///
     /// **This function yields**
-    pub fn handle_topology_request_and_wait(
+    pub fn handle_join_request_and_wait(
         &self,
-        req: TopologyRequest,
+        req: join::Request,
     ) -> traft::Result<Box<traft::Instance>> {
-        let (notify_for_address, notify) =
-            self.raw_operation(|node_impl| node_impl.process_topology_request_async(req))?;
+        let (notify_addr, notify_instance) =
+            self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?;
         block_on(async {
-            if let Some(notify) = notify_for_address {
-                // FIXME: this error should be handled
-                let _ = notify.recv_any().await;
-            }
-            notify.recv().await.map(Box::new)
+            let (addr, instance) = futures::join!(notify_addr.recv_any(), notify_instance.recv());
+            addr?;
+            instance.map(Box::new)
         })
     }
 
+    /// Processes the [`update_instance::Request`] request and appends
+    /// [`Op::PersistInstance`] entry to the raft log (if successful).
+    ///
+    /// Returns `Ok(())` when the entry is committed.
+    ///
+    /// Returns an error if the callee node isn't a raft leader.
+    ///
+    /// **This function yields**
+    pub fn handle_update_instance_request_and_wait(
+        &self,
+        req: update_instance::Request,
+    ) -> traft::Result<()> {
+        let notify =
+            self.raw_operation(|node_impl| node_impl.process_update_instance_request_async(req))?;
+        block_on(notify.recv_any())?;
+        Ok(())
+    }
+
     /// Only the conf_change_loop on a leader is eligible to call this function.
     ///
     /// **This function yields**
@@ -305,6 +320,7 @@ impl Node {
 
     /// This function **may yield** if `self.node_impl` mutex is acquired.
     #[inline]
+    #[track_caller]
     fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
         let mut node_impl = self.node_impl.lock();
         let res = f(&mut node_impl);
@@ -432,6 +448,8 @@ impl NodeImpl {
         Ok(notify)
     }
 
+    /// **Doesn't yield**
+    #[inline]
     pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
     where
         T: Into<traft::Op>,
@@ -462,58 +480,65 @@ impl NodeImpl {
         }
     }
 
-    /// Processes the topology request and appends [`Op::PersistInstance`]
-    /// entry to the raft log (if successful).
+    /// Processes the [`join::Request`] request and appends necessary entries
+    /// to the raft log (if successful).
     ///
     /// Returns an error if the callee node isn't a Raft leader.
     ///
-    /// **This function yields**
-    pub fn process_topology_request_async(
+    /// **This function doesn't yield**
+    pub fn process_join_request_async(
         &mut self,
-        req: TopologyRequest,
-    ) -> traft::Result<(Option<Notify>, Notify)> {
+        req: join::Request,
+    ) -> traft::Result<(Notify, Notify)> {
         let topology = self.topology_mut()?;
-        // FIXME: remove this once we introduce some 'async' stuff
-        let notify_for_address;
-        let instance = match req {
-            TopologyRequest::Join(join::Request {
-                instance_id,
-                replicaset_id,
-                advertise_address,
-                failure_domain,
-                ..
-            }) => {
-                let (instance, address) = topology
-                    .join(
-                        instance_id,
-                        replicaset_id,
-                        advertise_address,
-                        failure_domain,
-                    )
-                    .map_err(RaftError::ConfChangeError)?;
-                let peer_address = traft::PeerAddress {
-                    raft_id: instance.raft_id,
-                    address,
-                };
-                let op =
-                    OpDML::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail");
-                let (lc, notify) = self.schedule_notification();
-                notify_for_address = Some(notify);
-                let ctx = traft::EntryContextNormal::new(lc, op);
-                // Important! Read bellow
-                self.raw_node.propose(ctx.to_bytes(), vec![])?;
-                instance
-            }
-            TopologyRequest::UpdateInstance(req) => {
-                notify_for_address = None;
-                topology
-                    .update_instance(req)
-                    .map_err(RaftError::ConfChangeError)?
-            }
+        let (instance, address) = topology
+            .join(
+                req.instance_id,
+                req.replicaset_id,
+                req.advertise_address,
+                req.failure_domain,
+            )
+            .map_err(RaftError::ConfChangeError)?;
+        let peer_address = traft::PeerAddress {
+            raft_id: instance.raft_id,
+            address,
         };
-        let (lc, notify) = self.schedule_notification();
-        let ctx = traft::EntryContextNormal::new(lc, OpPersistInstance::new(instance));
+        let op_addr = OpDML::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail");
+        let op_instance = OpPersistInstance::new(instance);
+        // Important! Calling `raw_node.propose()` may result in
+        // `ProposalDropped` error, but the topology has already been
+        // modified. The correct handling of this case should be the
+        // following.
+        //
+        // The `topology_cache` should be preserved. It won't be fully
+        // consistent anymore, but that's bearable. (TODO: examine how
+        // the particular requests are handled). At least it doesn't
+        // much differ from the case of overriding the entry due to a
+        // re-election.
+        //
+        // On the other hand, dropping topology_cache may be much more
+        // harmful. Loss of the uncommitted entries could result in
+        // assigning the same `raft_id` to a two different nodes.
+        Ok((
+            self.propose_async(op_addr)?,
+            self.propose_async(op_instance)?,
+        ))
+    }
 
+    /// Processes the [`update_instance::Request`] request and appends [`Op::PersistInstance`]
+    /// entry to the raft log (if successful).
+    ///
+    /// Returns an error if the callee node isn't a Raft leader.
+    ///
+    /// **This function doesn't yield**
+    pub fn process_update_instance_request_async(
+        &mut self,
+        req: update_instance::Request,
+    ) -> traft::Result<Notify> {
+        let topology = self.topology_mut()?;
+        let instance = topology
+            .update_instance(req)
+            .map_err(RaftError::ConfChangeError)?;
         // Important! Calling `raw_node.propose()` may result in
         // `ProposalDropped` error, but the topology has already been
         // modified. The correct handling of this case should be the
@@ -529,9 +554,7 @@ impl NodeImpl {
         // harmful. Loss of the uncommitted entries could result in
         // assigning the same `raft_id` to a two different nodes.
         //
-        self.raw_node.propose(ctx.to_bytes(), vec![])?;
-
-        Ok((notify_for_address, notify))
+        Ok(self.propose_async(OpPersistInstance::new(instance))?)
     }
 
     fn propose_conf_change_async(
@@ -1122,7 +1145,7 @@ fn raft_conf_change_loop(
                 "current_grade" => %req.current_grade.expect("just set"),
                 "instance_id" => %req.instance_id,
             );
-            if let Err(e) = node.handle_topology_request_and_wait(req.into()) {
+            if let Err(e) = node.handle_update_instance_request_and_wait(req) {
                 tlog!(Warning,
                     "failed handling update_instance::Request: {e}";
                     "instance_id" => %instance.instance_id,
@@ -1217,12 +1240,11 @@ fn raft_conf_change_loop(
                     ));
                 global()
                     .expect("can't be deinitialized")
-                    .handle_topology_request_and_wait(req.into())
+                    .handle_update_instance_request_and_wait(req)
             });
             match res {
-                Ok(instance) => {
+                Ok(()) => {
                     tlog!(Info, "raft sync processed");
-                    debug_assert!(instance.current_grade == CurrentGradeVariant::RaftSynced);
                 }
                 Err(e) => {
                     tlog!(Warning, "raft sync failed: {e}";
@@ -1282,7 +1304,7 @@ fn raft_conf_change_loop(
                     .with_current_grade(CurrentGrade::replicated(
                         instance.target_grade.incarnation,
                     ));
-                node.handle_topology_request_and_wait(req.into())?;
+                node.handle_update_instance_request_and_wait(req)?;
 
                 Ok(())
             })();
@@ -1378,7 +1400,7 @@ fn raft_conf_change_loop(
                     .with_current_grade(CurrentGrade::sharding_initialized(
                         instance.target_grade.incarnation,
                     ));
-                node.handle_topology_request_and_wait(req.into())?;
+                node.handle_update_instance_request_and_wait(req)?;
 
                 if !vshard_bootstrapped {
                     // TODO: if this fails, it will only rerun next time vshard
@@ -1483,7 +1505,7 @@ fn raft_conf_change_loop(
                             .with_current_grade(CurrentGrade::online(
                                 instance.target_grade.incarnation,
                             ));
-                    node.handle_topology_request_and_wait(req.into())?;
+                    node.handle_update_instance_request_and_wait(req)?;
                     Ok(())
                 })()
             } else {
@@ -1503,7 +1525,7 @@ fn raft_conf_change_loop(
                         let cluster_id = cluster_id.clone();
                         let req = update_instance::Request::new(instance_id.clone(), cluster_id)
                             .with_current_grade(CurrentGrade::online(target_grade.incarnation));
-                        node.handle_topology_request_and_wait(req.into())?;
+                        node.handle_update_instance_request_and_wait(req)?;
                         // TODO: change `Info` to `Debug`
                         tlog!(Info, "instance is online"; "instance_id" => %instance_id);
                     }
diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs
index 8e73d9027c..c589ece43e 100644
--- a/src/traft/rpc/expel.rs
+++ b/src/traft/rpc/expel.rs
@@ -22,9 +22,9 @@ crate::define_rpc_request! {
             return Err(Error::NotALeader);
         }
 
-        let req2 = update_instance::Request::new(req.instance_id, req.cluster_id)
+        let req = update_instance::Request::new(req.instance_id, req.cluster_id)
             .with_target_grade(traft::TargetGradeVariant::Expelled);
-        node.handle_topology_request_and_wait(req2.into())?;
+        node.handle_update_instance_request_and_wait(req)?;
 
         Ok(Response {})
     }
diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs
index 109305c6e6..b3f95f1ddc 100644
--- a/src/traft/rpc/join.rs
+++ b/src/traft/rpc/join.rs
@@ -27,7 +27,7 @@ crate::define_rpc_request! {
             });
         }
 
-        match node.handle_topology_request_and_wait(req.into()) {
+        match node.handle_join_request_and_wait(req) {
             Ok(instance) => {
                 let mut box_replication = vec![];
                 for replica in node.storage.instances.replicaset_instances(&instance.replicaset_id)? {
diff --git a/src/traft/rpc/update_instance.rs b/src/traft/rpc/update_instance.rs
index f96410f9ce..88092c8bd7 100644
--- a/src/traft/rpc/update_instance.rs
+++ b/src/traft/rpc/update_instance.rs
@@ -28,7 +28,7 @@ crate::define_rpc_request! {
                 "current_grade" => %current_grade,
             );
         }
-        match node.handle_topology_request_and_wait(req.into()) {
+        match node.handle_update_instance_request_and_wait(req) {
             Ok(_) => Ok(Response::Ok {}),
             Err(Error::NotALeader) => Ok(Response::ErrNotALeader),
             Err(e) => Err(e),
-- 
GitLab