From e4148f039bec415ece5db905c5494cf09e5d747c Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Dec 2022 13:52:43 +0300
Subject: [PATCH] refactor(governor): plan for downgrading instances
 Offline/Expelled

---
 src/governor/mod.rs | 185 +++++++++++++++++++++++---------------------
 1 file changed, 96 insertions(+), 89 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 351c19995f..9af4fe0d95 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -19,6 +19,7 @@ use crate::traft::raft_storage::RaftSpaceAccess;
 use crate::traft::rpc;
 use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
 use crate::traft::rpc::{replication, sharding, sync, update_instance};
+use crate::traft::InstanceId;
 use crate::traft::OpDML;
 use crate::traft::RaftId;
 use crate::traft::RaftTerm;
@@ -30,7 +31,7 @@ use crate::unwrap_ok_or;
 
 use actions::*;
 
-use futures::future::join_all;
+use futures::future::{join_all, try_join_all};
 
 pub(crate) mod cc;
 pub(crate) mod migration;
@@ -69,6 +70,7 @@ impl Loop {
 
         let plan = action_plan(
             term,
+            cluster_id.clone(),
             instances,
             &voters,
             &learners,
@@ -147,91 +149,56 @@ impl Loop {
                 }
             }
 
-            Plan::None => {
-                tlog!(Info, "nothing to do");
-            }
-        }
+            Plan::ReconfigureShardingAndDowngrade(ReconfigureShardingAndDowngrade {
+                targets,
+                rpc,
+                req,
+            }) => {
+                tlog!(Info, "downgrading instance {}", req.instance_id);
 
-        ////////////////////////////////////////////////////////////////////////
-        // offline/expel
-        let to_offline = instances
-            .iter()
-            .filter(|instance| instance.current_grade != CurrentGradeVariant::Offline)
-            // TODO: process them all, not just the first one
-            .find(|instance| {
-                let (target, current) = (
-                    instance.target_grade.variant,
-                    instance.current_grade.variant,
-                );
-                matches!(target, TargetGradeVariant::Offline)
-                    || !matches!(current, CurrentGradeVariant::Expelled)
-                        && matches!(target, TargetGradeVariant::Expelled)
-            });
-        if let Some(instance) = to_offline {
-            tlog!(
-                Info,
-                "processing {} {} -> {}",
-                instance.instance_id,
-                instance.current_grade,
-                instance.target_grade
-            );
-
-            // reconfigure vshard storages and routers
-            let res: Result<_> = async {
-                let commit = raft_storage.commit()?.unwrap();
-                let reqs = maybe_responding(instances)
-                    .filter(|instance| {
-                        instance.current_grade == CurrentGradeVariant::ShardingInitialized
-                            || instance.current_grade == CurrentGradeVariant::Online
-                    })
-                    .map(|instance| {
-                        tlog!(Info,
-                            "calling rpc::sharding";
-                            "instance_id" => %instance.instance_id
-                        );
-                        (
-                            instance.instance_id.clone(),
-                            sharding::Request {
-                                term,
-                                commit,
-                                timeout: Self::SYNC_TIMEOUT,
-                            },
-                        )
-                    });
-                // TODO: don't hard code timeout
-                let res = call_all(pool, reqs, Duration::from_secs(3)).await?;
-                for (_, resp) in res {
-                    let sharding::Response {} = resp?;
+                let res: Result<_> = async {
+                    tlog!(Info, "reconfiguring sharding");
+                    let mut fs = vec![];
+                    for instance_id in targets {
+                        tlog!(Info, "calling rpc::sharding"; "instance_id" => %instance_id);
+                        let resp = pool.call(instance_id, &rpc)?;
+                        fs.push(async move {
+                            resp.await.map_err(|e| {
+                                tlog!(Warning, "failed calling rpc::sharding: {e}";
+                                    "instance_id" => %instance_id
+                                );
+                                e
+                            })
+                        });
+                    }
+                    // TODO: don't hard code timeout
+                    try_join_all(fs).timeout(Duration::from_secs(3)).await??;
+                    Ok(())
+                }
+                .await;
+                if let Err(e) = res {
+                    tlog!(Warning, "failed reconfiguring sharding: {e}");
+                    event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
+                    return Continue;
                 }
-                Ok(())
-            }
-            .await;
-            if let Err(e) = res {
-                tlog!(Warning, "failed calling rpc::sharding: {e}");
-                // TODO: don't hard code timeout
-                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                return Continue;
-            }
 
-            // update instance's CurrentGrade
-            let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id)
-                .with_current_grade(instance.target_grade.into());
-            tlog!(Info,
-                "handling update_instance::Request";
-                "current_grade" => %req.current_grade.expect("just set"),
-                "instance_id" => %req.instance_id,
-            );
-            if let Err(e) = node.handle_update_instance_request_and_wait(req) {
-                tlog!(Warning,
-                    "failed handling update_instance::Request: {e}";
-                    "instance_id" => %instance.instance_id,
+                let instance_id = req.instance_id.clone();
+                tlog!(Info, "handling instance grade change";
+                    "instance_id" => %instance_id,
+                    "current_grade" => %req.current_grade.expect("must be set"),
                 );
-                // TODO: don't hard code timeout
-                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                return Continue;
+                if let Err(e) = node.handle_update_instance_request_and_wait(req) {
+                    tlog!(Warning, "failed handling instance grade change: {e}";
+                        "instance_id" => %instance_id,
+                    );
+                    event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
+                    return Continue;
+                }
             }
 
-            return Continue;
+            Plan::None => {
+                tlog!(Info, "nothing to do");
+            }
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -684,6 +651,7 @@ impl Loop {
 #[allow(clippy::too_many_arguments)]
 fn action_plan<'i>(
     term: RaftTerm,
+    cluster_id: String,
     instances: &'i [Instance],
     voters: &[RaftId],
     learners: &[RaftId],
@@ -692,11 +660,15 @@ fn action_plan<'i>(
     _storage: &Clusterwide,
     raft_storage: &RaftSpaceAccess,
 ) -> Result<Plan<'i>> {
+    ////////////////////////////////////////////////////////////////////////////
+    // conf change
     if let Some(conf_change) = raft_conf_change(instances, voters, learners) {
         return Ok(Plan::ConfChange(conf_change));
     }
 
-    let to_offline = instances
+    ////////////////////////////////////////////////////////////////////////////
+    // downgrading
+    let to_downgrade = instances
         .iter()
         .filter(|instance| instance.current_grade != CurrentGradeVariant::Offline)
         // TODO: process them all, not just the first one
@@ -709,9 +681,17 @@ fn action_plan<'i>(
                 || !matches!(current, CurrentGradeVariant::Expelled)
                     && matches!(target, TargetGradeVariant::Expelled)
         });
-    if let Some(instance) = to_offline {
+    if let Some(Instance {
+        raft_id,
+        instance_id,
+        replicaset_id,
+        target_grade,
+        ..
+    }) = to_downgrade
+    {
+        ////////////////////////////////////////////////////////////////////////
         // transfer leadership, if we're the one who goes offline
-        if instance.raft_id == my_raft_id {
+        if *raft_id == my_raft_id {
             let new_leader = maybe_responding(instances)
                 // FIXME: linear search
                 .find(|instance| voters.contains(&instance.raft_id));
@@ -727,13 +707,8 @@ fn action_plan<'i>(
             );
         }
 
+        ////////////////////////////////////////////////////////////////////////
         // choose a new replicaset master if needed and promote it
-        // TODO: move up
-        let Instance {
-            replicaset_id,
-            instance_id,
-            ..
-        } = instance;
         let replicaset = replicasets.get(replicaset_id);
         if matches!(replicaset, Some(replicaset) if replicaset.master_id == instance_id) {
             let new_master = maybe_responding(instances).find(|p| p.replicaset_id == replicaset_id);
@@ -754,6 +729,25 @@ fn action_plan<'i>(
                 "replicaset_id" => %replicaset_id,
             );
         }
+
+        ////////////////////////////////////////////////////////////////////////
+        // reconfigure vshard storages and routers
+        // and update instance's CurrentGrade afterwards
+        let targets = maybe_responding(instances)
+            .filter(|instance| {
+                instance.current_grade == CurrentGradeVariant::ShardingInitialized
+                    || instance.current_grade == CurrentGradeVariant::Online
+            })
+            .map(|instance| &instance.instance_id)
+            .collect();
+        let rpc = sharding::Request {
+            term,
+            commit: raft_storage.commit()?.unwrap(),
+            timeout: Loop::SYNC_TIMEOUT,
+        };
+        let req = update_instance::Request::new(instance_id.clone(), cluster_id)
+            .with_current_grade((*target_grade).into());
+        return Ok(ReconfigureShardingAndDowngrade { targets, rpc, req }.into());
     }
 
     Ok(Plan::None)
@@ -892,11 +886,18 @@ mod actions {
         pub op: OpDML,
     }
 
+    pub struct ReconfigureShardingAndDowngrade<'i> {
+        pub targets: Vec<&'i InstanceId>,
+        pub rpc: sharding::Request,
+        pub req: update_instance::Request,
+    }
+
     pub enum Plan<'i> {
         None,
         ConfChange(ConfChangeV2),
         TransferLeadership(TransferLeadership<'i>),
         TransferMastership(TransferMastership<'i>),
+        ReconfigureShardingAndDowngrade(ReconfigureShardingAndDowngrade<'i>),
     }
 
     impl From<ConfChangeV2> for Plan<'_> {
@@ -916,4 +917,10 @@ mod actions {
             Self::TransferMastership(a)
         }
     }
+
+    impl<'i> From<ReconfigureShardingAndDowngrade<'i>> for Plan<'i> {
+        fn from(a: ReconfigureShardingAndDowngrade<'i>) -> Self {
+            Self::ReconfigureShardingAndDowngrade(a)
+        }
+    }
 }
-- 
GitLab