From b387242875a96eff5b9a9366480d3edf4fe5185c Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Dec 2022 17:16:37 +0300
Subject: [PATCH] refactor(governor): plan for init sharding

---
 src/governor/mod.rs | 130 +++++++++++++++++++++++++++-----------------
 1 file changed, 79 insertions(+), 51 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 151b1d1c25..a54f1cfea1 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -292,6 +292,49 @@ impl Loop {
                 }
             }
 
+            Plan::ShardingInit(ShardingInit { targets, rpc, req }) => {
+                governor_step! {
+                    "configuring sharding"
+                    async {
+                        let mut fs = vec![];
+                        for instance_id in targets {
+                            tlog!(Info, "calling rpc::sharding"; "instance_id" => %instance_id);
+                            let resp = pool.call(instance_id, &rpc)?;
+                            fs.push(async move {
+                                match resp.await {
+                                    Ok(_) => {
+                                        tlog!(Info, "configured sharding with instance";
+                                            "instance_id" => %instance_id,
+                                        );
+                                        Ok(())
+                                    }
+                                    Err(e) => {
+                                        tlog!(Warning, "failed calling rpc::sharding: {e}";
+                                            "instance_id" => %instance_id
+                                        );
+                                        Err(e)
+                                    }
+                                }
+                            });
+                        }
+                        // TODO: don't hard code timeout
+                        try_join_all(fs).timeout(Duration::from_secs(3)).await??
+                    }
+                }
+
+                let instance_id = req.instance_id.clone();
+                let current_grade = req.current_grade.expect("must be set");
+                governor_step! {
+                    "handling instance grade change" [
+                        "instance_id" => %instance_id,
+                        "current_grade" => %current_grade,
+                    ]
+                    async {
+                        node.handle_update_instance_request_and_wait(req)?
+                    }
+                }
+            }
+
             Plan::None => {
                 tlog!(Info, "nothing to do");
                 did_something = false;
@@ -302,57 +345,6 @@ impl Loop {
             return Continue;
         }
 
-        ////////////////////////////////////////////////////////////////////////
-        // init sharding
-        let to_shard = instances.iter().find(|instance| {
-            instance.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online)
-        });
-        if let Some(instance) = to_shard {
-            let res: Result<_> = async {
-                let commit = raft_storage.commit()?.unwrap();
-                let reqs = maybe_responding(instances).map(|instance| {
-                    (
-                        instance.instance_id.clone(),
-                        sharding::Request {
-                            term,
-                            commit,
-                            timeout: Self::SYNC_TIMEOUT,
-                        },
-                    )
-                });
-                // TODO: don't hard code timeout
-                let res = call_all(pool, reqs, Duration::from_secs(3)).await?;
-
-                for (instance_id, resp) in res {
-                    let sharding::Response {} = resp?;
-
-                    // TODO: change `Info` to `Debug`
-                    tlog!(Info, "initialized sharding with instance";
-                        "instance_id" => %instance_id,
-                    );
-                }
-
-                let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id)
-                    .with_current_grade(CurrentGrade::sharding_initialized(
-                        instance.target_grade.incarnation,
-                    ));
-                node.handle_update_instance_request_and_wait(req)?;
-
-                Ok(())
-            }
-            .await;
-            if let Err(e) = res {
-                tlog!(Warning, "failed to initialize sharding: {e}");
-                // TODO: don't hard code timeout
-                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                return Continue;
-            }
-
-            tlog!(Info, "sharding is initialized");
-
-            return Continue;
-        }
-
         ////////////////////////////////////////////////////////////////////////
         // bootstrap sharding
         let to_bootstrap = get_first_full_replicaset(instances, storage);
@@ -747,6 +739,29 @@ fn action_plan<'i>(
         return Ok(Replication { targets, rpc, req }.into());
     }
 
+    ////////////////////////////////////////////////////////////////////////////
+    // init sharding
+    let to_shard = instances.iter().find(|instance| {
+        instance.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online)
+    });
+    if let Some(Instance {
+        instance_id,
+        target_grade,
+        ..
+    }) = to_shard
+    {
+        let targets = maybe_responding(instances)
+            .map(|instance| &instance.instance_id)
+            .collect();
+        let rpc = sharding::Request {
+            term,
+            commit,
+            timeout: Loop::SYNC_TIMEOUT,
+        };
+        let req = update_instance::Request::new(instance_id.clone(), cluster_id)
+            .with_current_grade(CurrentGrade::sharding_initialized(target_grade.incarnation));
+        return Ok(ShardingInit { targets, rpc, req }.into());
+    }
     Ok(Plan::None)
 }
 
@@ -908,6 +923,12 @@ mod actions {
         pub req: update_instance::Request,
     }
 
+    pub struct ShardingInit<'i> {
+        pub targets: Vec<&'i InstanceId>,
+        pub rpc: sharding::Request,
+        pub req: update_instance::Request,
+    }
+
     pub enum Plan<'i> {
         None,
         ConfChange(ConfChangeV2),
@@ -917,6 +938,7 @@ mod actions {
         RaftSync(RaftSync<'i>),
         CreateReplicaset(CreateReplicaset<'i>),
         Replication(Replication<'i>),
+        ShardingInit(ShardingInit<'i>),
     }
 
     impl From<ConfChangeV2> for Plan<'_> {
@@ -960,4 +982,10 @@ mod actions {
             Self::Replication(a)
         }
     }
+
+    impl<'i> From<ShardingInit<'i>> for Plan<'i> {
+        fn from(a: ShardingInit<'i>) -> Self {
+            Self::ShardingInit(a)
+        }
+    }
 }
-- 
GitLab