From 280816eba50d6afb280d9e68a7dac26a5fd2dd59 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Dec 2022 16:50:58 +0300
Subject: [PATCH] refactor(governor): plan for creating replicaset

---
 src/governor/mod.rs | 169 +++++++++++++++++++++-----------------------
 1 file changed, 79 insertions(+), 90 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index ed9fa645a7..83c6240782 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -87,6 +87,7 @@ impl Loop {
         let commit = raft_storage.commit().unwrap().unwrap();
         let cluster_id = raft_storage.cluster_id().unwrap().unwrap();
         let node = global().expect("must be initialized");
+        let vshard_bootstrapped = storage.properties.vshard_bootstrapped().unwrap();
 
         let plan = action_plan(
             term,
@@ -97,6 +98,7 @@ impl Loop {
             &learners,
             &replicasets,
             node.raft_id,
+            vshard_bootstrapped,
         );
         let plan = unwrap_ok_or!(plan,
             Err(e) => {
@@ -218,103 +220,41 @@ impl Loop {
                 }
             }
 
-            Plan::None => {
-                tlog!(Info, "nothing to do");
-                did_something = false;
-            }
-        }
-
-        if did_something {
-            return Continue;
-        }
-
-        ////////////////////////////////////////////////////////////////////////
-        // create new replicaset
-        if let Some(to_create_replicaset) = instances
-            .iter()
-            .filter(|instance| {
-                instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
-            })
-            .find_map(
-                |instance| match storage.replicasets.get(&instance.replicaset_id) {
-                    Err(e) => Some(Err(e)),
-                    Ok(None) => Some(Ok(instance)),
-                    Ok(_) => None,
-                },
-            )
-        {
-            // TODO: what if this is not actually the replicaset bootstrap leader?
-            let Instance {
-                instance_id,
+            Plan::CreateReplicaset(CreateReplicaset {
+                master_id,
                 replicaset_id,
-                replicaset_uuid,
-                ..
-            } = unwrap_ok_or!(to_create_replicaset,
-                Err(e) => {
-                    tlog!(Warning, "{e}");
-
-                    // TODO: don't hard code timeout
-                    event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)).unwrap();
-                    return Continue;
+                rpc,
+                op,
+            }) => {
+                governor_step! {
+                    "promoting new replicaset master" [
+                        "master_id" => %master_id,
+                        "replicaset_id" => %replicaset_id,
+                    ]
+                    async {
+                        pool.call(master_id, &rpc)?
+                            .timeout(Duration::from_secs(3))
+                            .await??
+                    }
                 }
-            );
 
-            // TODO: change `Info` to `Debug`
-            tlog!(Info, "promoting new replicaset master";
-                "master_id" => %instance_id,
-                "replicaset_id" => %replicaset_id,
-            );
-            let res: Result<_> = async {
-                let req = replication::promote::Request {
-                    term,
-                    commit: raft_storage.commit()?.unwrap(),
-                    timeout: Self::SYNC_TIMEOUT,
-                };
-                let replication::promote::Response {} = pool
-                    .call(instance_id, &req)?
-                    // TODO: don't hard code the timeout
-                    .timeout(Duration::from_secs(3))
-                    .await??;
-                Ok(())
-            }
-            .await;
-            if let Err(e) = res {
-                tlog!(Warning, "failed promoting new replicaset master: {e}";
-                    "master_id" => %instance_id,
-                    "replicaset_id" => %replicaset_id,
-                );
-                return Continue;
+                governor_step! {
+                    "creating new replicaset" [
+                        "replicaset_id" => %replicaset_id,
+                    ]
+                    async {
+                        node.propose_and_wait(op, Duration::from_secs(3))??;
+                    }
+                }
             }
 
-            // TODO: change `Info` to `Debug`
-            tlog!(Info, "creating new replicaset";
-                "master_id" => %instance_id,
-                "replicaset_id" => %replicaset_id,
-            );
-            let res: Result<_> = async {
-                let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?;
-                let req = OpDML::insert(
-                    ClusterwideSpace::Replicaset,
-                    &Replicaset {
-                        replicaset_id: replicaset_id.clone(),
-                        replicaset_uuid: replicaset_uuid.clone(),
-                        master_id: instance_id.clone(),
-                        weight: if vshard_bootstrapped { 0. } else { 1. },
-                        current_schema_version: 0,
-                    },
-                )?;
-                // TODO: don't hard code the timeout
-                node.propose_and_wait(req, Duration::from_secs(3))??;
-                Ok(())
-            }
-            .await;
-            if let Err(e) = res {
-                tlog!(Warning, "failed creating new replicaset: {e}";
-                    "replicaset_id" => %replicaset_id,
-                );
-                return Continue;
+            Plan::None => {
+                tlog!(Info, "nothing to do");
+                did_something = false;
             }
+        }
 
+        if did_something {
             return Continue;
         }
 
@@ -645,6 +585,7 @@ fn action_plan<'i>(
     learners: &[RaftId],
     replicasets: &HashMap<&ReplicasetId, &'i Replicaset>,
     my_raft_id: RaftId,
+    vshard_bootstrapped: bool,
 ) -> Result<Plan<'i>> {
     ////////////////////////////////////////////////////////////////////////////
     // conf change
@@ -758,6 +699,40 @@ fn action_plan<'i>(
         return Ok(RaftSync { instance_id, rpc, req }.into());
     }
 
+    ////////////////////////////////////////////////////////////////////////////
+    // create new replicaset
+    let to_create_replicaset = instances
+        .iter()
+        .filter(|instance| {
+            instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
+        })
+        .find(|instance| replicasets.get(&instance.replicaset_id).is_none());
+    if let Some(Instance {
+        instance_id: master_id,
+        replicaset_id,
+        replicaset_uuid,
+        ..
+    }) = to_create_replicaset
+    {
+        let rpc = replication::promote::Request {
+            term,
+            commit,
+            timeout: Loop::SYNC_TIMEOUT,
+        };
+        let op = OpDML::insert(
+            ClusterwideSpace::Replicaset,
+            &Replicaset {
+                replicaset_id: replicaset_id.clone(),
+                replicaset_uuid: replicaset_uuid.clone(),
+                master_id: master_id.clone(),
+                weight: if vshard_bootstrapped { 0. } else { 1. },
+                current_schema_version: 0,
+            },
+        )?;
+        #[rustfmt::skip]
+        return Ok(CreateReplicaset { master_id, replicaset_id, rpc, op }.into());
+    }
+
     Ok(Plan::None)
 }
 
@@ -906,6 +881,13 @@ mod actions {
         pub req: update_instance::Request,
     }
 
+    pub struct CreateReplicaset<'i> {
+        pub master_id: &'i InstanceId,
+        pub replicaset_id: &'i ReplicasetId,
+        pub rpc: replication::promote::Request,
+        pub op: OpDML,
+    }
+
     pub enum Plan<'i> {
         None,
         ConfChange(ConfChangeV2),
@@ -913,6 +895,7 @@ mod actions {
         TransferMastership(TransferMastership<'i>),
         ReconfigureShardingAndDowngrade(ReconfigureShardingAndDowngrade<'i>),
         RaftSync(RaftSync<'i>),
+        CreateReplicaset(CreateReplicaset<'i>),
     }
 
     impl From<ConfChangeV2> for Plan<'_> {
@@ -944,4 +927,10 @@ mod actions {
             Self::RaftSync(a)
         }
     }
+
+    impl<'i> From<CreateReplicaset<'i>> for Plan<'i> {
+        fn from(a: CreateReplicaset<'i>) -> Self {
+            Self::CreateReplicaset(a)
+        }
+    }
 }
-- 
GitLab