From 99b03fe7bf34e37d9cc1ec626c8e70d4e7ffb443 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 16 Dec 2022 17:26:44 +0300
Subject: [PATCH] refactor(replicaset): add weight state & origin instead of
 target_weight

---
 src/governor/migration.rs |  3 +-
 src/governor/mod.rs       |  6 +--
 src/governor/plan.rs      | 78 +++++++++++++++++------------------
 src/replicaset.rs         | 85 ++++++++++++++++++++++++++++++++-------
 src/traft/rpc/sharding.rs |  2 +-
 test/int/test_basics.py   |  2 +-
 6 files changed, 112 insertions(+), 64 deletions(-)

diff --git a/src/governor/migration.rs b/src/governor/migration.rs
index 65f28248d1..f7aef01358 100644
--- a/src/governor/migration.rs
+++ b/src/governor/migration.rs
@@ -26,8 +26,7 @@ mod tests {
             replicaset_id: rid.into(),
             replicaset_uuid: Default::default(),
             master_id: String::default().into(),
-            current_weight: Default::default(),
-            target_weight: Default::default(),
+            weight: Default::default(),
             current_schema_version: mid,
         }
     }
diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index fc596416c2..17af1501c1 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -350,10 +350,10 @@ impl Loop {
                 }
             }
 
-            Plan::TargetWeights(TargetWeights { ops }) => {
+            Plan::ProposeWeightChanges(ProposeWeightChanges { ops }) => {
                 for op in ops {
                     governor_step! {
-                        "proposing target replicaset weights change"
+                        "proposing replicaset weight changes"
                         async {
                             node.propose_and_wait(op, Duration::from_secs(3))??;
                         }
@@ -361,7 +361,7 @@ impl Loop {
                 }
             }
 
-            Plan::CurrentWeights(CurrentWeights { targets, rpc, ops }) => {
+            Plan::UpdateWeights(UpdateWeights { targets, rpc, ops }) => {
                 governor_step! {
                     "updating sharding weights"
                     async {
diff --git a/src/governor/plan.rs b/src/governor/plan.rs
index e84408f26f..49a93f9e46 100644
--- a/src/governor/plan.rs
+++ b/src/governor/plan.rs
@@ -1,4 +1,5 @@
-use crate::replicaset::{Replicaset, ReplicasetId, Weight};
+use crate::replicaset::weight;
+use crate::replicaset::{Replicaset, ReplicasetId};
 use crate::storage::{ClusterwideSpace, PropertyName};
 use crate::tlog;
 use crate::traft::rpc;
@@ -9,7 +10,7 @@ use crate::traft::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant};
 use crate::traft::{Instance, InstanceId};
 use crate::traft::{RaftId, RaftIndex, RaftTerm};
 use ::tarantool::space::UpdateOps;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 use super::cc::raft_conf_change;
 use super::migration::get_pending_migration;
@@ -171,8 +172,11 @@ pub(super) fn action_plan<'i>(
                 replicaset_id: replicaset_id.clone(),
                 replicaset_uuid: replicaset_uuid.clone(),
                 master_id: master_id.clone(),
-                current_weight: weight,
-                target_weight: weight,
+                weight: weight::Info {
+                    value: weight,
+                    origin: weight::Origin::Auto,
+                    state: weight::State::Initial,
+                },
                 current_schema_version: 0,
             },
         )?;
@@ -255,23 +259,27 @@ pub(super) fn action_plan<'i>(
     };
 
     ////////////////////////////////////////////////////////////////////////////
-    // target sharding weights
-    let new_target_weights = get_target_weight_changes(instances, replicasets, replication_factor);
-    if let Some(new_target_weights) = new_target_weights {
+    // proposing automatic sharding weight changes
+    let to_change_weights = get_auto_weight_changes(instances, replicasets, replication_factor);
+    if !to_change_weights.is_empty() {
         let mut ops = vec![];
-        for (replicaset_id, weight) in new_target_weights {
+        for replicaset_id in to_change_weights {
             let mut uops = UpdateOps::new();
-            uops.assign("target_weight", weight)?;
+            uops.assign(weight::Value::PATH, 1.)?;
+            uops.assign(weight::State::PATH, weight::State::Updating)?;
             let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
             ops.push(op);
         }
-        return Ok(TargetWeights { ops }.into());
+        return Ok(ProposeWeightChanges { ops }.into());
     }
 
     ////////////////////////////////////////////////////////////////////////////
-    // current sharding weights
-    let new_current_weights = get_current_weight_changes(replicasets.values().copied());
-    if let Some(new_current_weights) = new_current_weights {
+    // applying proposed sharding weight changes
+    let to_update_weights: Vec<_> = replicasets
+        .values()
+        .filter_map(|r| (r.weight.state == weight::State::Updating).then_some(&r.replicaset_id))
+        .collect();
+    if !to_update_weights.is_empty() {
         let targets = maybe_responding(instances)
             .map(|instance| &instance.instance_id)
             .collect();
@@ -281,13 +289,13 @@ pub(super) fn action_plan<'i>(
             timeout: Loop::SYNC_TIMEOUT,
         };
         let mut ops = vec![];
-        for (replicaset_id, weight) in new_current_weights {
+        for replicaset_id in to_update_weights {
             let mut uops = UpdateOps::new();
-            uops.assign("current_weight", weight)?;
+            uops.assign(weight::State::PATH, weight::State::UpToDate)?;
             let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
             ops.push(op);
         }
-        return Ok(CurrentWeights { targets, rpc, ops }.into());
+        return Ok(UpdateWeights { targets, rpc, ops }.into());
     }
 
     ////////////////////////////////////////////////////////////////////////////
@@ -415,11 +423,11 @@ pub mod stage {
             pub op: OpDML,
         }
 
-        pub struct TargetWeights {
+        pub struct ProposeWeightChanges {
             pub ops: Vec<OpDML>,
         }
 
-        pub struct CurrentWeights<'i> {
+        pub struct UpdateWeights<'i> {
             pub targets: Vec<&'i InstanceId>,
             pub rpc: sharding::Request,
             pub ops: Vec<OpDML>,
@@ -438,41 +446,27 @@ pub mod stage {
 }
 
 #[inline(always)]
-fn get_target_weight_changes<'i>(
+fn get_auto_weight_changes<'i>(
     instances: &'i [Instance],
     replicasets: &HashMap<&ReplicasetId, &Replicaset>,
     replication_factor: usize,
-) -> Option<HashMap<&'i ReplicasetId, Weight>> {
+) -> HashSet<&'i ReplicasetId> {
     let mut replicaset_sizes = HashMap::new();
-    let mut weight_changes = HashMap::new();
+    let mut weight_changes = HashSet::new();
     for Instance { replicaset_id, .. } in maybe_responding(instances) {
         let replicaset_size = replicaset_sizes.entry(replicaset_id).or_insert(0);
         *replicaset_size += 1;
-        let Some(Replicaset {
-            current_weight,
-            target_weight,
-            ..
-        }) = replicasets.get(replicaset_id) else {
+        let Some(Replicaset { weight, .. }) = replicasets.get(replicaset_id) else {
             continue;
         };
-        if *replicaset_size >= replication_factor && *current_weight == 0. && *target_weight != 1. {
-            weight_changes.entry(replicaset_id).or_insert(1.);
+        if weight.origin == weight::Origin::User || weight.state == weight::State::Updating {
+            continue;
+        }
+        if *replicaset_size >= replication_factor && weight.value == 0. {
+            weight_changes.insert(replicaset_id);
         }
     }
-    (!weight_changes.is_empty()).then_some(weight_changes)
-}
-
-#[inline(always)]
-fn get_current_weight_changes<'r>(
-    replicasets: impl IntoIterator<Item = &'r Replicaset>,
-) -> Option<HashMap<&'r ReplicasetId, Weight>> {
-    let res: HashMap<_, _> = replicasets
-        .into_iter()
-        .filter_map(|r| {
-            (r.current_weight != r.target_weight).then_some((&r.replicaset_id, r.target_weight))
-        })
-        .collect();
-    (!res.is_empty()).then_some(res)
+    weight_changes
 }
 
 #[inline(always)]
diff --git a/src/replicaset.rs b/src/replicaset.rs
index 91ed247c5a..23f99cc429 100644
--- a/src/replicaset.rs
+++ b/src/replicaset.rs
@@ -1,5 +1,4 @@
 use crate::traft::InstanceId;
-use crate::util::Transition;
 use ::tarantool::tlua;
 use ::tarantool::tuple::Encode;
 
@@ -26,11 +25,8 @@ pub struct Replicaset {
     /// Instance id of the current replication leader.
     pub master_id: InstanceId,
 
-    /// Current sharding weight of the replicaset.
-    pub current_weight: Weight,
-
-    /// Target sharding weight of the replicaset.
-    pub target_weight: Weight,
+    /// Sharding weight of the replicaset.
+    pub weight: weight::Info,
 
     /// Current schema version of the replicaset.
     pub current_schema_version: u64,
@@ -44,8 +40,7 @@ impl Replicaset {
             Field::from(("replicaset_id", FieldType::String)),
             Field::from(("replicaset_uuid", FieldType::String)),
             Field::from(("master_id", FieldType::String)),
-            Field::from(("current_weight", FieldType::Double)),
-            Field::from(("target_weight", FieldType::Double)),
+            Field::from(("weight", FieldType::Array)),
             Field::from(("current_schema_version", FieldType::Unsigned)),
         ]
     }
@@ -56,13 +51,73 @@ impl std::fmt::Display for Replicaset {
         write!(
             f,
             "({}, master: {}, weight: {}, schema_version: {})",
-            self.replicaset_id,
-            self.master_id,
-            Transition {
-                from: self.current_weight,
-                to: self.target_weight
-            },
-            self.current_schema_version,
+            self.replicaset_id, self.master_id, self.weight, self.current_schema_version,
         )
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+/// Replicaset weight
+pub mod weight {
+    /// Replicaset weight info
+    #[derive(Default, Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
+    pub struct Info {
+        pub value: super::Weight,
+        pub origin: Origin,
+        pub state: State,
+    }
+
+    pub struct Value;
+    impl Value {
+        // FIXME: there's got to be a better way
+        pub const PATH: &str = "weight[1]";
+    }
+
+    ::tarantool::define_str_enum! {
+        /// Replicaset weight origin
+        #[derive(Default)]
+        pub enum Origin {
+            /// Weight is determined by governor.
+            #[default]
+            Auto = "Auto",
+
+            /// Weight is specified by user.
+            User = "User",
+        }
+    }
+    impl Origin {
+        // FIXME: there's got to be a better way
+        pub const PATH: &str = "weight[2]";
+    }
+
+    ::tarantool::define_str_enum! {
+        /// Replicaset weight state
+        #[derive(Default)]
+        pub enum State {
+            /// Weight is set to the inital value, which will be changed.
+            #[default]
+            Initial = "Initial",
+
+            /// Weight is in progress of being updated.
+            Updating = "Updating",
+
+            /// Weight doesn't need updating.
+            UpToDate = "UpToDate",
+        }
+    }
+    impl State {
+        // FIXME: there's got to be a better way
+        pub const PATH: &str = "weight[3]";
+    }
+
+    impl std::fmt::Display for Info {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            let Self {
+                value,
+                origin,
+                state,
+            } = self;
+            write!(f, "({origin}, {state}, {value})")
+        }
+    }
+}
diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs
index 193f2089c9..716a430075 100644
--- a/src/traft/rpc/sharding.rs
+++ b/src/traft/rpc/sharding.rs
@@ -159,7 +159,7 @@ pub mod cfg {
                     continue;
                 };
                 let (weight, is_master) = match replicasets.get(&peer.replicaset_id) {
-                    Some(r) => (Some(r.target_weight), r.master_id == peer.instance_id),
+                    Some(r) => (Some(r.weight.value), r.master_id == peer.instance_id),
                     None => (None, false),
                 };
                 let replicaset = sharding.entry(peer.replicaset_uuid)
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index c835442503..8ae2b25ede 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -225,7 +225,7 @@ def test_raft_log(instance: Instance):
 |  6  | 2  |     |-|
 |  7  | 2  |1.1.1|PersistInstance(i1, 1, r1, Offline(0) -> Online(1), {b})|
 |  8  | 2  |1.1.2|PersistInstance(i1, 1, r1, RaftSynced(1) -> Online(1), {b})|
-|  9  | 2  |1.1.3|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,1.0,0])|
+|  9  | 2  |1.1.3|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",[1.0,"Auto","Initial"],0])|
 | 10  | 2  |1.1.4|PersistInstance(i1, 1, r1, Replicated(1) -> Online(1), {b})|
 | 11  | 2  |1.1.5|PersistInstance(i1, 1, r1, ShardingInitialized(1) -> Online(1), {b})|
 | 12  | 2  |1.1.6|Replace(_picodata_property, ["vshard_bootstrapped",true])|
-- 
GitLab