From 4de536f40fad76d5f7443f3a95af2368700f3cf8 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Dec 2022 17:43:20 +0300
Subject: [PATCH] refactor(governor): plan for bootstrapping sharding

---
 src/governor/mod.rs | 115 ++++++++++++++++++++------------------------
 1 file changed, 51 insertions(+), 64 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index e2065d1734..d7b9d76f5d 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -88,6 +88,7 @@ impl Loop {
         let cluster_id = raft_storage.cluster_id().unwrap().unwrap();
         let node = global().expect("must be initialized");
         let vshard_bootstrapped = storage.properties.vshard_bootstrapped().unwrap();
+        let replication_factor = storage.properties.replication_factor().unwrap();
 
         let plan = action_plan(
             term,
@@ -99,6 +100,7 @@ impl Loop {
             &replicasets,
             node.raft_id,
             vshard_bootstrapped,
+            replication_factor,
         );
         let plan = unwrap_ok_or!(plan,
             Err(e) => {
@@ -335,6 +337,21 @@ impl Loop {
                 }
             }
 
+            Plan::ShardingBoot(ShardingBoot { target, rpc, op }) => {
+                governor_step! {
+                    "bootstrapping bucket distribution" [
+                        "instance_id" => %target,
+                    ]
+                    async {
+                        pool
+                            .call(target, &rpc)?
+                            .timeout(Loop::SYNC_TIMEOUT)
+                            .await??;
+                        node.propose_and_wait(op, Duration::from_secs(3))??
+                    }
+                }
+            }
+
             Plan::None => {
                 tlog!(Info, "nothing to do");
                 did_something = false;
@@ -345,59 +362,6 @@ impl Loop {
             return Continue;
         }
 
-        ////////////////////////////////////////////////////////////////////////
-        // bootstrap sharding
-        let to_bootstrap = get_first_full_replicaset(instances, storage);
-        if let Err(e) = to_bootstrap {
-            tlog!(
-                Warning,
-                "failed checking if bucket bootstrapping is needed: {e}"
-            );
-            // TODO: don't hard code timeout
-            event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-            return Continue;
-        }
-        if let Ok(Some(Replicaset { master_id, .. })) = to_bootstrap {
-            // TODO: change `Info` to `Debug`
-            tlog!(Info, "bootstrapping bucket distribution";
-                "instance_id" => %master_id,
-            );
-            let res: Result<_> = async {
-                let req = sharding::bootstrap::Request {
-                    term,
-                    commit: raft_storage.commit()?.unwrap(),
-                    timeout: Self::SYNC_TIMEOUT,
-                };
-                pool.call(&master_id, &req)?
-                    // TODO: don't hard code timeout
-                    .timeout(Duration::from_secs(3))
-                    .await??;
-
-                let op = OpDML::replace(
-                    ClusterwideSpace::Property,
-                    &(PropertyName::VshardBootstrapped, true),
-                )?;
-                // TODO: don't hard code timeout
-                node.propose_and_wait(op, Duration::from_secs(3))??;
-
-                Ok(())
-            }
-            .await;
-            if let Err(e) = res {
-                tlog!(Warning, "failed bootstrapping bucket distribution: {e}");
-                // TODO: don't hard code timeout
-                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                return Continue;
-            }
-
-            // TODO: change `Info` to `Debug`
-            tlog!(Info, "bootstrapped bucket distribution";
-                "instance_id" => %master_id,
-            );
-
-            return Continue;
-        };
-
         ////////////////////////////////////////////////////////////////////////
         // sharding weights
         let to_update_weights = instances.iter().find(|instance| {
@@ -561,6 +525,7 @@ fn action_plan<'i>(
     replicasets: &HashMap<&ReplicasetId, &'i Replicaset>,
     my_raft_id: RaftId,
     vshard_bootstrapped: bool,
+    replication_factor: usize,
 ) -> Result<Plan<'i>> {
     ////////////////////////////////////////////////////////////////////////////
     // conf change
@@ -762,6 +727,26 @@ fn action_plan<'i>(
             .with_current_grade(CurrentGrade::sharding_initialized(target_grade.incarnation));
         return Ok(ShardingInit { targets, rpc, req }.into());
     }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // bootstrap sharding
+    let to_bootstrap = (!vshard_bootstrapped)
+        .then(|| get_first_full_replicaset(instances, replicasets, replication_factor))
+        .flatten();
+    if let Some(Replicaset { master_id, .. }) = to_bootstrap {
+        let target = master_id;
+        let rpc = sharding::bootstrap::Request {
+            term,
+            commit,
+            timeout: Loop::SYNC_TIMEOUT,
+        };
+        let op = OpDML::replace(
+            ClusterwideSpace::Property,
+            &(PropertyName::VshardBootstrapped, true),
+        )?;
+        return Ok(ShardingBoot { target, rpc, op }.into());
+    };
+
     Ok(Plan::None)
 }
 
@@ -855,15 +840,11 @@ fn get_weight_changes<'p>(
 }
 
 #[inline(always)]
-fn get_first_full_replicaset(
+fn get_first_full_replicaset<'r>(
     instances: &[Instance],
-    storage: &Clusterwide,
-) -> Result<Option<Replicaset>> {
-    if storage.properties.vshard_bootstrapped()? {
-        return Ok(None);
-    }
-
-    let replication_factor = storage.properties.replication_factor()?;
+    replicasets: &HashMap<&ReplicasetId, &'r Replicaset>,
+    replication_factor: usize,
+) -> Option<&'r Replicaset> {
     let mut replicaset_sizes = HashMap::new();
     let mut full_replicaset_id = None;
     for Instance { replicaset_id, .. } in maybe_responding(instances) {
@@ -874,9 +855,9 @@ fn get_first_full_replicaset(
         }
     }
 
-    let Some(replicaset_id) = full_replicaset_id else { return Ok(None); };
-    let res = storage.replicasets.get(replicaset_id)?;
-    Ok(res)
+    full_replicaset_id
+        .and_then(|id| replicasets.get(id))
+        .copied()
 }
 
 #[inline(always)]
@@ -963,5 +944,11 @@ mod actions {
             pub rpc: sharding::Request,
             pub req: update_instance::Request,
         }
+
+        pub struct ShardingBoot<'i> {
+            pub target: &'i InstanceId,
+            pub rpc: sharding::bootstrap::Request,
+            pub op: OpDML,
+        }
     }
 }
-- 
GitLab