From 74644fdde55e09f14fbe06e5bbfb516e9a827ff7 Mon Sep 17 00:00:00 2001
From: Egor Ivkov <e.o.ivkov@gmail.com>
Date: Mon, 14 Aug 2023 17:11:17 +0300
Subject: [PATCH] refactor: move handle join/update to closer to related procs

---
 src/governor/mod.rs        |  11 +--
 src/on_shutdown.rs         |   3 +-
 src/rpc/expel.rs           |   3 +-
 src/rpc/join.rs            | 127 +++++++++++++++++++++++++-----
 src/rpc/update_instance.rs |  95 ++++++++++++++++++-----
 src/traft/node.rs          | 154 -------------------------------------
 6 files changed, 194 insertions(+), 199 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 0f61d4cb03..c9747c97ae 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -11,6 +11,7 @@ use crate::instance::Instance;
 use crate::op::Op;
 use crate::r#loop::FlowControl::{self, Continue};
 use crate::rpc;
+use crate::rpc::update_instance::handle_update_instance_request_and_wait;
 use crate::storage::Clusterwide;
 use crate::storage::ToEntryIter as _;
 use crate::tlog;
@@ -222,7 +223,7 @@ impl Loop {
                         "current_grade" => %current_grade,
                     ]
                     async {
-                        node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
+                        handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
                     }
                 }
             }
@@ -304,7 +305,7 @@ impl Loop {
                         "current_grade" => %current_grade,
                     ]
                     async {
-                        node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
+                        handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
                     }
                 }
             }
@@ -347,7 +348,7 @@ impl Loop {
                         "current_grade" => %current_grade,
                     ]
                     async {
-                        node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
+                        handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
                     }
                 }
             }
@@ -385,7 +386,7 @@ impl Loop {
                         "current_grade" => %current_grade,
                     ]
                     async {
-                        node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
+                        handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
                     }
                 }
             }
@@ -439,7 +440,7 @@ impl Loop {
                         "current_grade" => %current_grade,
                     ]
                     async {
-                        node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
+                        handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
                     }
                 }
             }
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index e854a454a3..d460c15d5d 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -5,6 +5,7 @@ use ::tarantool::fiber;
 use crate::has_grades;
 use crate::instance::grade::TargetGradeVariant;
 use crate::rpc;
+use crate::rpc::update_instance::handle_update_instance_request_and_wait;
 use crate::storage::ClusterwideSpaceId;
 use crate::tlog;
 use crate::traft;
@@ -80,7 +81,7 @@ fn go_offline() -> traft::Result<()> {
         let now = Instant::now();
         let wait_before_retry = Duration::from_millis(300);
 
-        match node.handle_update_instance_request_and_wait(req.clone(), wait_before_retry) {
+        match handle_update_instance_request_and_wait(req.clone(), wait_before_retry) {
             Ok(_) => break Ok(()),
             Err(e) => {
                 tlog!(Warning,
diff --git a/src/rpc/expel.rs b/src/rpc/expel.rs
index a1d3e6b9e5..418db90d39 100644
--- a/src/rpc/expel.rs
+++ b/src/rpc/expel.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
 use crate::instance::grade::TargetGradeVariant;
 use crate::instance::InstanceId;
 use crate::rpc;
+use crate::rpc::update_instance::handle_update_instance_request_and_wait;
 use crate::traft::Result;
 use crate::traft::{error::Error, node};
 
@@ -35,7 +36,7 @@ crate::define_rpc_request! {
 
         let req = rpc::update_instance::Request::new(req.instance_id, req.cluster_id)
             .with_target_grade(TargetGradeVariant::Expelled);
-        node.handle_update_instance_request_and_wait(req, TIMEOUT)?;
+        handle_update_instance_request_and_wait(req, TIMEOUT)?;
 
         Ok(Response {})
     }
diff --git a/src/rpc/join.rs b/src/rpc/join.rs
index 3077ad4077..077a8aaa48 100644
--- a/src/rpc/join.rs
+++ b/src/rpc/join.rs
@@ -1,11 +1,18 @@
 use std::time::Duration;
 
+use crate::cas;
 use crate::failure_domain::FailureDomain;
+use crate::instance::replication_ids;
 use crate::instance::{Instance, InstanceId};
 use crate::replicaset::ReplicasetId;
 use crate::storage::ToEntryIter as _;
+use crate::storage::{ClusterwideSpaceId, PropertyName};
+use crate::traft;
+use crate::traft::op::{Dml, Op};
 use crate::traft::{error::Error, node, Address, PeerAddress, Result};
 
+use ::tarantool::fiber;
+
 const TIMEOUT: Duration = Duration::from_secs(10);
 
 crate::define_rpc_request! {
@@ -22,25 +29,7 @@ crate::define_rpc_request! {
     /// 4. Compare and swap request to commit new instance and its address failed
     /// with an error that cannot be retried.
     fn proc_raft_join(req: Request) -> Result<Response> {
-        let node = node::global()?;
-        let cluster_id = node.raft_storage.cluster_id()?;
-
-        if req.cluster_id != cluster_id {
-            return Err(Error::ClusterIdMismatch {
-                instance_cluster_id: req.cluster_id,
-                cluster_cluster_id: cluster_id,
-            });
-        }
-
-        let (instance, replication_addresses) = node.handle_join_request_and_wait(req, TIMEOUT)?;
-        // A joined instance needs to communicate with other nodes.
-        // TODO: limit the number of entries sent to reduce response size.
-        let peer_addresses = node.storage.peer_addresses.iter()?.collect();
-
-        Ok(Response {
-            instance,
-            peer_addresses,
-            box_replication: replication_addresses.into_iter().collect()})
+        handle_join_request_and_wait(req, TIMEOUT)
     }
 
     /// Request to join the cluster.
@@ -63,3 +52,103 @@ crate::define_rpc_request! {
         pub box_replication: Vec<Address>,
     }
 }
+
+/// Processes the [`rpc::join::Request`] and appends necessary
+/// entries to the raft log (if successful).
+///
+/// Returns the [`Response`] containing the resulting [`Instance`] when the entry is committed.
+// TODO: to make this function async and have an outer timeout,
+// wait_* fns also need to be async.
+pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<Response> {
+    let node = node::global()?;
+    let cluster_id = node.raft_storage.cluster_id()?;
+    let storage = &node.storage;
+    let raft_storage = &node.raft_storage;
+
+    if req.cluster_id != cluster_id {
+        return Err(Error::ClusterIdMismatch {
+            instance_cluster_id: req.cluster_id,
+            cluster_cluster_id: cluster_id,
+        });
+    }
+
+    let deadline = fiber::clock().saturating_add(timeout);
+    loop {
+        let instance = Instance::new(
+            req.instance_id.as_ref(),
+            req.replicaset_id.as_ref(),
+            &req.failure_domain,
+            storage,
+        )
+        .map_err(raft::Error::ConfChangeError)?;
+        let mut replication_addresses = storage
+            .peer_addresses
+            .addresses_by_ids(replication_ids(&instance.replicaset_id, storage))?;
+        replication_addresses.insert(req.advertise_address.clone());
+        let peer_address = traft::PeerAddress {
+            raft_id: instance.raft_id,
+            address: req.advertise_address.clone(),
+        };
+        let op_addr = Dml::replace(ClusterwideSpaceId::Address, &peer_address)
+            .expect("encoding should not fail");
+        let op_instance = Dml::replace(ClusterwideSpaceId::Instance, &instance)
+            .expect("encoding should not fail");
+        let ranges = vec![
+            cas::Range::new(ClusterwideSpaceId::Instance),
+            cas::Range::new(ClusterwideSpaceId::Address),
+            cas::Range::new(ClusterwideSpaceId::Property).eq((PropertyName::ReplicationFactor,)),
+        ];
+        macro_rules! handle_result {
+            ($res:expr) => {
+                match $res {
+                    Ok((index, term)) => {
+                        node.wait_index(index, deadline.duration_since(fiber::clock()))?;
+                        if term != raft::Storage::term(raft_storage, index)? {
+                            // leader switched - retry
+                            node.wait_status();
+                            continue;
+                        }
+                    }
+                    Err(err) => {
+                        if err.is_cas_err() | err.is_term_mismatch_err() {
+                            // cas error - retry
+                            fiber::sleep(Duration::from_millis(500));
+                            continue;
+                        } else {
+                            return Err(err);
+                        }
+                    }
+                }
+            };
+        }
+        // Only in this order - so that when instance exists - address will always be there.
+        handle_result!(cas::compare_and_swap(
+            Op::Dml(op_addr),
+            cas::Predicate {
+                index: raft_storage.applied()?,
+                term: raft_storage.term()?,
+                ranges: ranges.clone(),
+            },
+            deadline.duration_since(fiber::clock()),
+        ));
+        handle_result!(cas::compare_and_swap(
+            Op::Dml(op_instance),
+            cas::Predicate {
+                index: raft_storage.applied()?,
+                term: raft_storage.term()?,
+                ranges,
+            },
+            deadline.duration_since(fiber::clock()),
+        ));
+        node.main_loop.wakeup();
+
+        // A joined instance needs to communicate with other nodes.
+        // TODO: limit the number of entries sent to reduce response size.
+        let peer_addresses = node.storage.peer_addresses.iter()?.collect();
+        return Ok(Response {
+            instance: instance.into(),
+            peer_addresses,
+            box_replication: replication_addresses.into_iter().collect(),
+        });
+    }
+}
diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs
index 29fbef0502..6e9226b327 100644
--- a/src/rpc/update_instance.rs
+++ b/src/rpc/update_instance.rs
@@ -1,12 +1,16 @@
 use std::time::Duration;
 
+use crate::cas;
 use crate::failure_domain::FailureDomain;
 use crate::instance::grade::{CurrentGrade, TargetGradeVariant};
 use crate::instance::InstanceId;
-use crate::tlog;
+use crate::storage::{ClusterwideSpaceId, PropertyName};
+use crate::traft::op::{Dml, Op};
 use crate::traft::Result;
 use crate::traft::{error::Error, node};
 
+use ::tarantool::fiber;
+
 const TIMEOUT: Duration = Duration::from_secs(10);
 
 crate::define_rpc_request! {
@@ -23,25 +27,10 @@ crate::define_rpc_request! {
     /// 4. Compare and swap request to commit updated instance failed
     /// with an error that cannot be retried.
     fn proc_update_instance(req: Request) -> Result<Response> {
-        let node = node::global()?;
-        let cluster_id = node.raft_storage.cluster_id()?;
-
-        if req.cluster_id != cluster_id {
-            return Err(Error::ClusterIdMismatch {
-                instance_cluster_id: req.cluster_id,
-                cluster_cluster_id: cluster_id,
-            });
-        }
-
-        let mut req = req;
-        let instance_id = &*req.instance_id;
-        if let Some(current_grade) = req.current_grade.take() {
-            tlog!(Warning, "attempt to change current_grade for instance";
-                "instance_id" => instance_id,
-                "current_grade" => %current_grade,
-            );
+        if req.current_grade.is_some() {
+           return Err(Error::Other("Changing current grade through Proc API is not allowed.".into()));
         }
-        node.handle_update_instance_request_and_wait(req, TIMEOUT)?;
+        handle_update_instance_request_and_wait(req, TIMEOUT)?;
         Ok(Response {})
     }
 
@@ -85,3 +74,71 @@ impl Request {
         self
     }
 }
+
+/// Processes the [`rpc::update_instance::Request`] and appends
+/// the corresponding [`Op::Dml`] entry to the raft log (if successful).
+///
+/// Returns `Ok(())` when the entry is committed.
+///
+/// **This function yields**
+// TODO: for this function to be async and have an outer timeout wait_* fns need to be async
+pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) -> Result<()> {
+    let node = node::global()?;
+    let cluster_id = node.raft_storage.cluster_id()?;
+    let storage = &node.storage;
+    let raft_storage = &node.raft_storage;
+
+    if req.cluster_id != cluster_id {
+        return Err(Error::ClusterIdMismatch {
+            instance_cluster_id: req.cluster_id,
+            cluster_cluster_id: cluster_id,
+        });
+    }
+
+    let deadline = fiber::clock().saturating_add(timeout);
+    loop {
+        let instance = storage
+            .instances
+            .get(&req.instance_id)?
+            .update(&req, storage)
+            .map_err(raft::Error::ConfChangeError)?;
+        let dml = Dml::replace(ClusterwideSpaceId::Instance, &instance)
+            .expect("encoding should not fail");
+
+        let ranges = vec![
+            cas::Range::new(ClusterwideSpaceId::Instance),
+            cas::Range::new(ClusterwideSpaceId::Address),
+            cas::Range::new(ClusterwideSpaceId::Property).eq((PropertyName::ReplicationFactor,)),
+        ];
+        let res = cas::compare_and_swap(
+            Op::Dml(dml),
+            cas::Predicate {
+                index: raft_storage.applied()?,
+                term: raft_storage.term()?,
+                ranges,
+            },
+            deadline.duration_since(fiber::clock()),
+        );
+        match res {
+            Ok((index, term)) => {
+                node.wait_index(index, deadline.duration_since(fiber::clock()))?;
+                if term != raft::Storage::term(raft_storage, index)? {
+                    // leader switched - retry
+                    node.wait_status();
+                    continue;
+                }
+            }
+            Err(err) => {
+                if err.is_cas_err() | err.is_term_mismatch_err() {
+                    // cas error - retry
+                    fiber::sleep(Duration::from_millis(500));
+                    continue;
+                } else {
+                    return Err(err);
+                }
+            }
+        }
+        node.main_loop.wakeup();
+        return Ok(());
+    }
+}
diff --git a/src/traft/node.rs b/src/traft/node.rs
index b3c1846bb3..29724ce364 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -5,10 +5,8 @@
 //! - handling configuration changes,
 //! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
 
-use crate::cas;
 use crate::governor;
 use crate::has_grades;
-use crate::instance::replication_ids;
 use crate::instance::Instance;
 use crate::kvcell::KVCell;
 use crate::loop_start;
@@ -30,7 +28,6 @@ use crate::traft::event;
 use crate::traft::event::Event;
 use crate::traft::notify::{notification, Notifier, Notify};
 use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult};
-use crate::traft::Address;
 use crate::traft::ConnectionPool;
 use crate::traft::ContextCoercion as _;
 use crate::traft::LogicalClock;
@@ -324,157 +321,6 @@ impl Node {
         })
     }
 
-    /// Processes the [`rpc::join::Request`] and appends necessary
-    /// entries to the raft log (if successful).
-    ///
-    /// Returns the resulting [`Instance`] when the entry is committed.
-    // TODO: to make this function async and have an outer timeout,
-    // wait_* fns also need to be async.
-    pub fn handle_join_request_and_wait(
-        &self,
-        req: rpc::join::Request,
-        timeout: Duration,
-    ) -> traft::Result<(Box<Instance>, HashSet<Address>)> {
-        let deadline = fiber::clock().saturating_add(timeout);
-
-        loop {
-            let instance = Instance::new(
-                req.instance_id.as_ref(),
-                req.replicaset_id.as_ref(),
-                &req.failure_domain,
-                &self.storage,
-            )
-            .map_err(RaftError::ConfChangeError)?;
-            let mut replication_addresses = self
-                .storage
-                .peer_addresses
-                .addresses_by_ids(replication_ids(&instance.replicaset_id, &self.storage))?;
-            replication_addresses.insert(req.advertise_address.clone());
-            let peer_address = traft::PeerAddress {
-                raft_id: instance.raft_id,
-                address: req.advertise_address.clone(),
-            };
-            let op_addr = Dml::replace(ClusterwideSpaceId::Address, &peer_address)
-                .expect("encoding should not fail");
-            let op_instance = Dml::replace(ClusterwideSpaceId::Instance, &instance)
-                .expect("encoding should not fail");
-            let ranges = vec![
-                cas::Range::new(ClusterwideSpaceId::Instance),
-                cas::Range::new(ClusterwideSpaceId::Address),
-                cas::Range::new(ClusterwideSpaceId::Property)
-                    .eq((PropertyName::ReplicationFactor,)),
-            ];
-            macro_rules! handle_result {
-                ($res:expr) => {
-                    match $res {
-                        Ok((index, term)) => {
-                            self.wait_index(index, deadline.duration_since(fiber::clock()))?;
-                            if term != raft::Storage::term(&self.raft_storage, index)? {
-                                // leader switched - retry
-                                self.wait_status();
-                                continue;
-                            }
-                        }
-                        Err(err) => {
-                            if err.is_cas_err() | err.is_term_mismatch_err() {
-                                // cas error - retry
-                                fiber::sleep(Duration::from_millis(500));
-                                continue;
-                            } else {
-                                return Err(err);
-                            }
-                        }
-                    }
-                };
-            }
-            // Only in this order - so that when instance exists - address will always be there.
-            handle_result!(cas::compare_and_swap(
-                Op::Dml(op_addr),
-                cas::Predicate {
-                    index: self.raft_storage.applied()?,
-                    term: self.raft_storage.term()?,
-                    ranges: ranges.clone(),
-                },
-                deadline.duration_since(fiber::clock()),
-            ));
-            handle_result!(cas::compare_and_swap(
-                Op::Dml(op_instance),
-                cas::Predicate {
-                    index: self.raft_storage.applied()?,
-                    term: self.raft_storage.term()?,
-                    ranges,
-                },
-                deadline.duration_since(fiber::clock()),
-            ));
-
-            self.main_loop.wakeup();
-            return Ok((instance.into(), replication_addresses));
-        }
-    }
-
-    /// Processes the [`rpc::update_instance::Request`] and appends
-    /// the corresponding [`Op::Dml`] entry to the raft log (if successful).
-    ///
-    /// Returns `Ok(())` when the entry is committed.
-    ///
-    /// **This function yields**
-    // TODO: for this function to be async and have an outer timeout wait_* fns need to be async
-    pub fn handle_update_instance_request_and_wait(
-        &self,
-        req: rpc::update_instance::Request,
-        timeout: Duration,
-    ) -> traft::Result<()> {
-        let deadline = fiber::clock().saturating_add(timeout);
-
-        loop {
-            let instance = self
-                .storage
-                .instances
-                .get(&req.instance_id)?
-                .update(&req, &self.storage)
-                .map_err(RaftError::ConfChangeError)?;
-            let dml = Dml::replace(ClusterwideSpaceId::Instance, &instance)
-                .expect("encoding should not fail");
-
-            let ranges = vec![
-                cas::Range::new(ClusterwideSpaceId::Instance),
-                cas::Range::new(ClusterwideSpaceId::Address),
-                cas::Range::new(ClusterwideSpaceId::Property)
-                    .eq((PropertyName::ReplicationFactor,)),
-            ];
-            let res = cas::compare_and_swap(
-                Op::Dml(dml),
-                cas::Predicate {
-                    index: self.raft_storage.applied()?,
-                    term: self.raft_storage.term()?,
-                    ranges,
-                },
-                deadline.duration_since(fiber::clock()),
-            );
-            match res {
-                Ok((index, term)) => {
-                    self.wait_index(index, deadline.duration_since(fiber::clock()))?;
-                    if term != raft::Storage::term(&self.raft_storage, index)? {
-                        // leader switched - retry
-                        self.wait_status();
-                        continue;
-                    }
-                }
-                Err(err) => {
-                    if err.is_cas_err() | err.is_term_mismatch_err() {
-                        // cas error - retry
-                        fiber::sleep(Duration::from_millis(500));
-                        continue;
-                    } else {
-                        return Err(err);
-                    }
-                }
-            }
-            self.main_loop.wakeup();
-            return Ok(());
-        }
-    }
-
     /// Only the conf_change_loop on a leader is eligible to call this function.
     ///
     /// **This function yields**
-- 
GitLab