From 40eb3185b390282af68b0959c5c25570a1365784 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Dec 2022 12:06:37 +0300
Subject: [PATCH] refactor(governor): construct requests with plan

---
 src/governor/mod.rs | 68 +++++++++++++++++++++++++++++++--------------
 1 file changed, 47 insertions(+), 21 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 8e07b31b6f..2c45edbec5 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -21,6 +21,7 @@ use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
 use crate::traft::rpc::{replication, sharding, sync, update_instance};
 use crate::traft::OpDML;
 use crate::traft::RaftId;
+use crate::traft::RaftTerm;
 use crate::traft::ReplicasetId;
 use crate::traft::Result;
 use crate::traft::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant};
@@ -67,6 +68,7 @@ impl Loop {
         let node = global().expect("must be initialized");
 
         let plan = action_plan(
+            term,
             instances,
             &voters,
             &learners,
@@ -83,6 +85,7 @@ impl Loop {
                 return Continue;
             }
         );
+
         if let Plan::ConfChange(conf_change) = plan {
             // main_loop gives the warranty that every ProposeConfChange
             // will sometimes be handled and there's no need in timeout.
@@ -95,25 +98,22 @@ impl Loop {
             }
             return Continue;
         }
+
         if let Plan::TransferLeadership(TransferLeadership { to }) = plan {
             tlog!(Info, "transferring leadership to {}", to.instance_id);
             node.transfer_leadership_and_yield(to.raft_id);
             event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
             return Continue;
         }
-        if let Plan::TransferMastership(TransferMastership { to }) = plan {
+
+        if let Plan::TransferMastership(TransferMastership { to, rpc, op }) = plan {
             #[rustfmt::skip]
             let Instance { instance_id, replicaset_id, .. } = to;
             tlog!(Info, "transferring replicaset mastership to {instance_id}");
 
             let res: Result<_> = async {
                 tlog!(Info, "promoting new master");
-                let req = replication::promote::Request {
-                    term,
-                    commit: raft_storage.commit()?.unwrap(),
-                    timeout: Self::SYNC_TIMEOUT,
-                };
-                pool.call(instance_id, &req)?
+                pool.call(instance_id, &rpc)?
                     // TODO: don't hard code timeout
                     .timeout(Duration::from_secs(3))
                     .await??;
@@ -131,9 +131,6 @@ impl Loop {
 
             let res: Result<_> = async {
                 tlog!(Info, "proposing replicaset master change");
-                let mut ops = UpdateOps::new();
-                ops.assign("master_id", instance_id)?;
-                let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?;
                 // TODO: don't hard code the timeout
                 node.propose_and_wait(op, Duration::from_secs(3))??;
                 Ok(())
@@ -678,14 +675,15 @@ impl Loop {
     }
 }
 
-#[allow(unused)]
+#[allow(clippy::too_many_arguments)]
 fn action_plan<'i>(
+    term: RaftTerm,
     instances: &'i [Instance],
     voters: &[RaftId],
     learners: &[RaftId],
     replicasets: &HashMap<&ReplicasetId, &'i Replicaset>,
     my_raft_id: RaftId,
-    storage: &Clusterwide,
+    _storage: &Clusterwide,
     raft_storage: &RaftSpaceAccess,
 ) -> Result<Plan<'i>> {
     if let Some(conf_change) = raft_conf_change(instances, voters, learners) {
@@ -733,10 +731,16 @@ fn action_plan<'i>(
         let replicaset = replicasets.get(replicaset_id);
         if matches!(replicaset, Some(replicaset) if replicaset.master_id == instance_id) {
             let new_master = maybe_responding(instances).find(|p| p.replicaset_id == replicaset_id);
-            if let Some(new_master) = new_master {
-                return Ok(Plan::TransferMastership(TransferMastership {
-                    to: new_master,
-                }));
+            if let Some(to) = new_master {
+                let rpc = replication::promote::Request {
+                    term,
+                    commit: raft_storage.commit()?.unwrap(),
+                    timeout: Loop::SYNC_TIMEOUT,
+                };
+                let mut ops = UpdateOps::new();
+                ops.assign("master_id", &to.instance_id)?;
+                let op = OpDML::update(ClusterwideSpace::Replicaset, &[&to.replicaset_id], ops)?;
+                return Ok(TransferMastership { to, rpc, op }.into());
             };
         } else {
             tlog!(Warning, "replicaset master is going offline and no substitution is found";
@@ -870,8 +874,9 @@ fn maybe_responding(instances: &[Instance]) -> impl Iterator<Item = &Instance> {
 
 mod actions {
     use super::*;
+    use raft::prelude::ConfChangeV2;
 
-    impl Actions for raft::prelude::ConfChangeV2 {
+    impl Actions for ConfChangeV2 {
         type Actions = Self;
     }
 
@@ -884,7 +889,10 @@ mod actions {
 
     pub struct TransferMastership<'i> {
         pub to: &'i Instance,
+        pub rpc: replication::promote::Request,
+        pub op: OpDML,
     }
+
     impl<'i> Actions for TransferMastership<'i> {
         type Actions = (replication::promote::Request, replicaset::update::Master);
     }
@@ -894,11 +902,11 @@ mod actions {
         type Actions;
     }
 
-    pub enum Plan<'a> {
+    pub enum Plan<'i> {
         None,
-        ConfChange(raft::prelude::ConfChangeV2),
-        TransferLeadership(TransferLeadership<'a>),
-        TransferMastership(TransferMastership<'a>),
+        ConfChange(ConfChangeV2),
+        TransferLeadership(TransferLeadership<'i>),
+        TransferMastership(TransferMastership<'i>),
     }
 
     mod replicaset {
@@ -906,4 +914,22 @@ mod actions {
             pub struct Master;
         }
     }
+
+    impl From<ConfChangeV2> for Plan<'_> {
+        fn from(a: ConfChangeV2) -> Self {
+            Self::ConfChange(a)
+        }
+    }
+
+    impl<'i> From<TransferLeadership<'i>> for Plan<'i> {
+        fn from(a: TransferLeadership<'i>) -> Self {
+            Self::TransferLeadership(a)
+        }
+    }
+
+    impl<'i> From<TransferMastership<'i>> for Plan<'i> {
+        fn from(a: TransferMastership<'i>) -> Self {
+            Self::TransferMastership(a)
+        }
+    }
 }
-- 
GitLab