From 7b18176c8b98b3dd8bdcf3b37d25ff1e39c4eb31 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Dec 2022 15:49:28 +0300
Subject: [PATCH] refactor(governor): plan for raft sync

---
 src/governor/mod.rs | 97 ++++++++++++++++++++++++++-------------------
 1 file changed, 57 insertions(+), 40 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 929af1e40c..c8471b4590 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -196,6 +196,28 @@ impl Loop {
                 }
             }
 
+            Plan::RaftSync(RaftSync {
+                instance_id,
+                rpc,
+                req,
+            }) => {
+                tlog!(Info, "syncing raft log"; "instance_id" => %instance_id);
+                let res = async {
+                    let sync::Response { commit } = pool
+                        .call(instance_id, &rpc)?
+                        .timeout(Loop::SYNC_TIMEOUT)
+                        .await??;
+                    tlog!(Info, "instance's commit index is {commit}"; "instance_id" => %instance_id);
+                    node.handle_update_instance_request_and_wait(req)
+                }
+                .await;
+                if let Err(e) = res {
+                    tlog!(Warning, "failed syncing raft log: {e}"; "instance_id" => %instance_id);
+                    event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)).unwrap();
+                    return Continue;
+                }
+            }
+
             Plan::None => {
                 tlog!(Info, "nothing to do");
                 did_something = false;
@@ -206,46 +228,6 @@ impl Loop {
             return Continue;
         }
 
-        ////////////////////////////////////////////////////////////////////////
-        // raft sync
-        // TODO: putting each stage in a different function
-        // will make the control flow more readable
-        let to_sync = instances.iter().find(|instance| {
-            instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online)
-                || instance.is_reincarnated()
-        });
-        if let Some(Instance {
-            instance_id,
-            target_grade,
-            ..
-        }) = to_sync
-        {
-            // TODO: change `Info` to `Debug`
-            tlog!(Info, "syncing raft log"; "instance_id" => %instance_id);
-            let res = async {
-                let req = sync::Request {
-                    commit: raft_storage.commit().unwrap().unwrap(),
-                    timeout: Self::SYNC_TIMEOUT,
-                };
-                let sync::Response { commit } = pool.call(instance_id, &req)?.await?;
-                // TODO: change `Info` to `Debug`
-                tlog!(Info, "instance's commit index is {commit}"; "instance_id" => %instance_id);
-
-                let req = update_instance::Request::new(instance_id.clone(), cluster_id)
-                    .with_current_grade(CurrentGrade::raft_synced(target_grade.incarnation));
-                node.handle_update_instance_request_and_wait(req)
-            }
-            .await;
-            if let Err(e) = res {
-                tlog!(Warning, "failed syncing raft log: {e}"; "instance_id" => %instance_id);
-
-                // TODO: don't hard code timeout
-                event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)).unwrap();
-            }
-
-            return Continue;
-        }
-
         ////////////////////////////////////////////////////////////////////////
         // create new replicaset
         if let Some(to_create_replicaset) = instances
@@ -755,6 +737,28 @@ fn action_plan<'i>(
         return Ok(ReconfigureShardingAndDowngrade { targets, rpc, req }.into());
     }
 
+    ////////////////////////////////////////////////////////////////////////////
+    // raft sync
+    let to_sync = instances.iter().find(|instance| {
+        instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online)
+            || instance.is_reincarnated()
+    });
+    if let Some(Instance {
+        instance_id,
+        target_grade,
+        ..
+    }) = to_sync
+    {
+        let rpc = sync::Request {
+            commit: raft_storage.commit()?.unwrap(),
+            timeout: Loop::SYNC_TIMEOUT,
+        };
+        let req = update_instance::Request::new(instance_id.clone(), cluster_id)
+            .with_current_grade(CurrentGrade::raft_synced(target_grade.incarnation));
+        #[rustfmt::skip]
+        return Ok(RaftSync { instance_id, rpc, req }.into());
+    }
+
     Ok(Plan::None)
 }
 
@@ -897,12 +901,19 @@ mod actions {
         pub req: update_instance::Request,
     }
 
+    pub struct RaftSync<'i> {
+        pub instance_id: &'i InstanceId,
+        pub rpc: sync::Request,
+        pub req: update_instance::Request,
+    }
+
     pub enum Plan<'i> {
         None,
         ConfChange(ConfChangeV2),
         TransferLeadership(TransferLeadership<'i>),
         TransferMastership(TransferMastership<'i>),
         ReconfigureShardingAndDowngrade(ReconfigureShardingAndDowngrade<'i>),
+        RaftSync(RaftSync<'i>),
     }
 
     impl From<ConfChangeV2> for Plan<'_> {
@@ -928,4 +939,10 @@ mod actions {
             Self::ReconfigureShardingAndDowngrade(a)
         }
     }
+
+    impl<'i> From<RaftSync<'i>> for Plan<'i> {
+        fn from(a: RaftSync<'i>) -> Self {
+            Self::RaftSync(a)
+        }
+    }
 }
-- 
GitLab