From 8f695db7b4a33ceb93c9e4279be05f3bcaef29a8 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 15 Dec 2022 16:21:04 +0300
Subject: [PATCH] refactor(governor): plan for changing replicaset weights & to
 online

---
 src/governor/mod.rs | 292 +++++++++++++++++++++++++-------------------
 1 file changed, 165 insertions(+), 127 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 6156674aa1..baf630bbd9 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -1,5 +1,4 @@
 use std::collections::HashMap;
-use std::iter::repeat;
 use std::time::Duration;
 
 use ::tarantool::fiber;
@@ -12,12 +11,12 @@ use crate::r#loop::FlowControl::{self, Continue};
 use crate::storage::ToEntryIter as _;
 use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
 use crate::tlog;
-use crate::traft::network::{ConnectionPool, IdOfInstance};
+use crate::traft::network::ConnectionPool;
 use crate::traft::node::global;
 use crate::traft::node::Status;
 use crate::traft::raft_storage::RaftSpaceAccess;
 use crate::traft::rpc;
-use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
+use crate::traft::rpc::sharding::cfg::Weight;
 use crate::traft::rpc::{replication, sharding, sync, update_instance};
 use crate::traft::InstanceId;
 use crate::traft::OpDML;
@@ -32,7 +31,7 @@ use crate::unwrap_ok_or;
 
 use actions::*;
 
-use futures::future::{join_all, try_join_all};
+use futures::future::try_join_all;
 
 pub(crate) mod cc;
 pub(crate) mod migration;
@@ -352,99 +351,78 @@ impl Loop {
                 }
             }
 
-            Plan::None => {
-                tlog!(Info, "nothing to do");
-                did_something = false;
+            Plan::TargetWeights(TargetWeights { ops }) => {
+                for op in ops {
+                    governor_step! {
+                        "proposing target replicaset weights change"
+                        async {
+                            node.propose_and_wait(op, Duration::from_secs(3))??;
+                        }
+                    }
+                }
             }
-        }
 
-        if did_something {
-            return Continue;
-        }
-
-        ////////////////////////////////////////////////////////////////////////
-        // sharding weights
-        let to_update_weights = instances.iter().find(|instance| {
-            instance.has_grades(
-                CurrentGradeVariant::ShardingInitialized,
-                TargetGradeVariant::Online,
-            )
-        });
-        if let Some(instance) = to_update_weights {
-            let res = if let Some(added_weights) =
-                get_weight_changes(maybe_responding(instances), storage)
-            {
-                async {
-                    for (replicaset_id, weight) in added_weights {
-                        let mut ops = UpdateOps::new();
-                        ops.assign("weight", weight)?;
-                        node.propose_and_wait(
-                            OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?,
-                            // TODO: don't hard code the timeout
-                            Duration::from_secs(3),
-                        )??;
+            Plan::CurrentWeights(CurrentWeights { targets, rpc, ops }) => {
+                governor_step! {
+                    "updating sharding weights"
+                    async {
+                        let mut fs = vec![];
+                        for instance_id in targets {
+                            tlog!(Info, "calling rpc::sharding"; "instance_id" => %instance_id);
+                            let resp = pool.call(instance_id, &rpc)?;
+                            fs.push(async move {
+                                match resp.await {
+                                    Ok(_) => {
+                                        tlog!(Info, "updated weights on instance";
+                                            "instance_id" => %instance_id,
+                                        );
+                                        Ok(())
+                                    }
+                                    Err(e) => {
+                                        tlog!(Warning, "failed calling rpc::sharding: {e}";
+                                            "instance_id" => %instance_id
+                                        );
+                                        Err(e)
+                                    }
+                                }
+                            });
+                        }
+                        // TODO: don't hard code timeout
+                        try_join_all(fs).timeout(Duration::from_secs(3)).await??
                     }
+                }
 
-                    let instance_ids =
-                        maybe_responding(instances).map(|instance| instance.instance_id.clone());
-                    let commit = raft_storage.commit()?.unwrap();
-                    let reqs = instance_ids.zip(repeat(sharding::Request {
-                        term,
-                        commit,
-                        timeout: Self::SYNC_TIMEOUT,
-                    }));
-                    // TODO: don't hard code timeout
-                    let res = call_all(pool, reqs, Duration::from_secs(3)).await?;
-
-                    for (instance_id, resp) in res {
-                        resp?;
-                        // TODO: change `Info` to `Debug`
-                        tlog!(Info, "instance is online"; "instance_id" => %instance_id);
+                for op in ops {
+                    governor_step! {
+                        "proposing current replicaset weights change"
+                        async {
+                            node.propose_and_wait(op, Duration::from_secs(3))??;
+                        }
                     }
-
-                    let req =
-                        update_instance::Request::new(instance.instance_id.clone(), cluster_id)
-                            .with_current_grade(CurrentGrade::online(
-                                instance.target_grade.incarnation,
-                            ));
-                    node.handle_update_instance_request_and_wait(req)?;
-                    Ok(())
                 }
-                .await
-            } else {
-                (|| -> Result<()> {
-                    let to_online = instances.iter().filter(|instance| {
-                        instance.has_grades(
-                            CurrentGradeVariant::ShardingInitialized,
-                            TargetGradeVariant::Online,
-                        )
-                    });
-                    for Instance {
-                        instance_id,
-                        target_grade,
-                        ..
-                    } in to_online
-                    {
-                        let cluster_id = cluster_id.clone();
-                        let req = update_instance::Request::new(instance_id.clone(), cluster_id)
-                            .with_current_grade(CurrentGrade::online(target_grade.incarnation));
-                        node.handle_update_instance_request_and_wait(req)?;
-                        // TODO: change `Info` to `Debug`
-                        tlog!(Info, "instance is online"; "instance_id" => %instance_id);
-                    }
-                    Ok(())
-                })()
-            };
-            if let Err(e) = res {
-                tlog!(Warning, "updating sharding weights failed: {e}");
+            }
 
-                // TODO: don't hard code timeout
-                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                return Continue;
+            Plan::ToOnline(ToOnline { req }) => {
+                let instance_id = req.instance_id.clone();
+                let current_grade = req.current_grade.expect("must be set");
+                governor_step! {
+                    "handling instance grade change" [
+                        "instance_id" => %instance_id,
+                        "current_grade" => %current_grade,
+                    ]
+                    async {
+                        node.handle_update_instance_request_and_wait(req)?
+                    }
+                }
             }
 
-            tlog!(Info, "sharding is configured");
+            Plan::None => {
+                tlog!(Info, "nothing to do");
+                did_something = false;
+            }
+        }
 
+        if did_something {
             return Continue;
         }
 
@@ -533,6 +511,8 @@ fn action_plan<'i>(
         return Ok(Plan::ConfChange(ConfChange { conf_change }));
     }
 
+    // TODO: reduce number of iterations over all instances
+
     ////////////////////////////////////////////////////////////////////////////
     // downgrading
     let to_downgrade = instances
@@ -749,6 +729,61 @@ fn action_plan<'i>(
         return Ok(ShardingBoot { target, rpc, op }.into());
     };
 
+    ////////////////////////////////////////////////////////////////////////////
+    // target sharding weights
+    let new_target_weights = get_target_weight_changes(instances, replicasets, replication_factor);
+    if let Some(new_target_weights) = new_target_weights {
+        let mut ops = vec![];
+        for (replicaset_id, weight) in new_target_weights {
+            let mut uops = UpdateOps::new();
+            uops.assign("target_weight", weight)?;
+            let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
+            ops.push(op);
+        }
+        return Ok(TargetWeights { ops }.into());
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // current sharding weights
+    let new_current_weights = get_current_weight_changes(replicasets.values().copied());
+    if let Some(new_current_weights) = new_current_weights {
+        let targets = maybe_responding(instances)
+            .map(|instance| &instance.instance_id)
+            .collect();
+        let rpc = sharding::Request {
+            term,
+            commit,
+            timeout: Loop::SYNC_TIMEOUT,
+        };
+        let mut ops = vec![];
+        for (replicaset_id, weight) in new_current_weights {
+            let mut uops = UpdateOps::new();
+            uops.assign("current_weight", weight)?;
+            let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
+            ops.push(op);
+        }
+        return Ok(CurrentWeights { targets, rpc, ops }.into());
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // to online
+    let to_online = instances.iter().find(|instance| {
+        instance.has_grades(
+            CurrentGradeVariant::ShardingInitialized,
+            TargetGradeVariant::Online,
+        )
+    });
+    if let Some(Instance {
+        instance_id,
+        target_grade,
+        ..
+    }) = to_online
+    {
+        let req = update_instance::Request::new(instance_id.clone(), cluster_id)
+            .with_current_grade(CurrentGrade::online(target_grade.incarnation));
+        return Ok(ToOnline { req }.into());
+    }
+
     Ok(Plan::None)
 }
 
@@ -792,55 +827,44 @@ struct State {
     pool: ConnectionPool,
 }
 
-#[allow(clippy::type_complexity)]
-async fn call_all<R, I>(
-    pool: &mut ConnectionPool,
-    reqs: impl IntoIterator<Item = (I, R)>,
-    timeout: Duration,
-) -> Result<Vec<(I, Result<R::Response>)>>
-where
-    R: rpc::Request,
-    I: IdOfInstance + 'static,
-{
-    let reqs = reqs.into_iter().collect::<Vec<_>>();
-    if reqs.is_empty() {
-        return Ok(vec![]);
-    }
-    let mut fs = vec![];
-    let mut ids = vec![];
-    for (id, req) in reqs {
-        fs.push(pool.call(&id, &req)?);
-        ids.push(id);
-    }
-    let responses = join_all(fs).timeout(timeout).await?;
-    Ok(ids.into_iter().zip(responses).collect())
-}
-
 #[inline(always)]
-fn get_weight_changes<'p>(
-    instances: impl IntoIterator<Item = &'p Instance>,
-    storage: &Clusterwide,
-) -> Option<ReplicasetWeights> {
-    let replication_factor = storage
-        .properties
-        .replication_factor()
-        .expect("storage error");
-    let replicaset_weights = storage.replicasets.weights().expect("storage error");
+fn get_target_weight_changes<'i>(
+    instances: &'i [Instance],
+    replicasets: &HashMap<&ReplicasetId, &Replicaset>,
+    replication_factor: usize,
+) -> Option<HashMap<&'i ReplicasetId, Weight>> {
     let mut replicaset_sizes = HashMap::new();
     let mut weight_changes = HashMap::new();
-    for instance @ Instance { replicaset_id, .. } in instances {
-        if !instance.may_respond() {
-            continue;
-        }
-        let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0);
+    for Instance { replicaset_id, .. } in instances {
+        let replicaset_size = replicaset_sizes.entry(replicaset_id).or_insert(0);
         *replicaset_size += 1;
-        if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. {
-            weight_changes.entry(replicaset_id.clone()).or_insert(1.);
+        let Some(Replicaset {
+            current_weight,
+            target_weight,
+            ..
+        }) = replicasets.get(replicaset_id) else {
+            continue;
+        };
+        if *replicaset_size >= replication_factor && *current_weight == 0. && *target_weight != 1. {
+            weight_changes.entry(replicaset_id).or_insert(1.);
         }
     }
     (!weight_changes.is_empty()).then_some(weight_changes)
 }
 
+#[inline(always)]
+fn get_current_weight_changes<'r>(
+    replicasets: impl IntoIterator<Item = &'r Replicaset>,
+) -> Option<HashMap<&'r ReplicasetId, Weight>> {
+    let res: HashMap<_, _> = replicasets
+        .into_iter()
+        .filter_map(|r| {
+            (r.current_weight != r.target_weight).then_some((&r.replicaset_id, r.target_weight))
+        })
+        .collect();
+    (!res.is_empty()).then_some(res)
+}
+
 #[inline(always)]
 fn get_first_full_replicaset<'r>(
     instances: &[Instance],
@@ -952,5 +976,19 @@ mod actions {
             pub rpc: sharding::bootstrap::Request,
             pub op: OpDML,
         }
+
+        pub struct TargetWeights {
+            pub ops: Vec<OpDML>,
+        }
+
+        pub struct CurrentWeights<'i> {
+            pub targets: Vec<&'i InstanceId>,
+            pub rpc: sharding::Request,
+            pub ops: Vec<OpDML>,
+        }
+
+        pub struct ToOnline {
+            pub req: update_instance::Request,
+        }
     }
 }
-- 
GitLab