From defcac2257072fe7ea4d2823aa6248ba237019da Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 15 Dec 2022 16:19:29 +0300 Subject: [PATCH] refactor(replicaset): add current_weight & target_weight to Replicaset --- src/governor/migration.rs | 3 ++- src/governor/mod.rs | 4 +++- src/storage.rs | 9 ++------- src/traft/mod.rs | 18 ++++++++++++++---- src/traft/rpc/sharding.rs | 5 ++--- test/int/test_basics.py | 2 +- 6 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/governor/migration.rs b/src/governor/migration.rs index 98b4293bb5..6790da5e11 100644 --- a/src/governor/migration.rs +++ b/src/governor/migration.rs @@ -47,7 +47,8 @@ mod tests { replicaset_id: ReplicasetId($id.to_string()), replicaset_uuid: "".to_string(), master_id: InstanceId("i0".to_string()), - weight: 1.0, + current_weight: 1.0, + target_weight: 1.0, current_schema_version: $schema_version, } }; diff --git a/src/governor/mod.rs b/src/governor/mod.rs index d7b9d76f5d..6156674aa1 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -659,13 +659,15 @@ fn action_plan<'i>( commit, timeout: Loop::SYNC_TIMEOUT, }; + let weight = if vshard_bootstrapped { 0. } else { 1. }; let op = OpDML::insert( ClusterwideSpace::Replicaset, &Replicaset { replicaset_id: replicaset_id.clone(), replicaset_uuid: replicaset_uuid.clone(), master_id: master_id.clone(), - weight: if vshard_bootstrapped { 0. } else { 1. }, + current_weight: weight, + target_weight: weight, current_schema_version: 0, }, )?; diff --git a/src/storage.rs b/src/storage.rs index 2db5d86d40..35d4066d7a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -4,7 +4,6 @@ use ::tarantool::tuple::{DecodeOwned, ToTupleBuffer, Tuple}; use crate::traft; use crate::traft::error::Error; -use crate::traft::rpc::sharding::cfg::ReplicasetWeights; use crate::traft::Migration; use crate::traft::RaftId; use crate::traft::Replicaset; @@ -198,7 +197,8 @@ impl Replicasets { .field(("replicaset_id", FieldType::String)) .field(("replicaset_uuid", FieldType::String)) .field(("master_id", FieldType::String)) - .field(("weight", FieldType::Double)) + .field(("current_weight", FieldType::Double)) + .field(("target_weight", FieldType::Double)) .field(("current_schema_version", FieldType::Unsigned)) .if_not_exists(true) .create()?; @@ -213,11 +213,6 @@ impl Replicasets { Ok(Self { space }) } - #[inline] - pub fn weights(&self) -> Result<ReplicasetWeights> { - Ok(self.iter()?.map(|r| (r.replicaset_id, r.weight)).collect()) - } - #[inline] pub fn get(&self, replicaset_id: &str) -> tarantool::Result<Option<Replicaset>> { match self.space.get(&[replicaset_id])? { diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 18e1da3791..29afcf060d 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -29,6 +29,7 @@ use protobuf::Message as _; pub use network::ConnectionPool; pub use raft_storage::RaftSpaceAccess; +pub use rpc::sharding::cfg::Weight; pub use rpc::{join, update_instance}; pub use topology::Topology; @@ -519,8 +520,11 @@ pub struct Replicaset { /// Instance id of the current replication leader. pub master_id: InstanceId, - /// Sharding weight of the replicaset. - pub weight: rpc::sharding::cfg::Weight, + /// Current sharding weight of the replicaset. + pub current_weight: Weight, + + /// Target sharding weight of the replicaset. + pub target_weight: Weight, /// Current schema version of the replicaset. pub current_schema_version: u64, @@ -531,8 +535,14 @@ impl std::fmt::Display for Replicaset { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "({}, master: {}, weight: {})", - self.replicaset_id, self.master_id, self.weight, + "({}, master: {}, weight: {}, schema_version: {})", + self.replicaset_id, + self.master_id, + Transition { + from: self.current_weight, + to: self.target_weight + }, + self.current_schema_version, ) } } diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs index 5ad8a57faf..5629bef37a 100644 --- a/src/traft/rpc/sharding.rs +++ b/src/traft/rpc/sharding.rs @@ -87,7 +87,7 @@ pub mod bootstrap { pub mod cfg { use crate::storage::Clusterwide; use crate::storage::ToEntryIter as _; - use crate::traft::{Result, ReplicasetId}; + use crate::traft::Result; use ::tarantool::tlua; @@ -140,7 +140,6 @@ pub mod cfg { Once, } - pub type ReplicasetWeights = HashMap<ReplicasetId, Weight>; pub type Weight = f64; impl Cfg { @@ -161,7 +160,7 @@ pub mod cfg { continue; }; let (weight, is_master) = match replicasets.get(&peer.replicaset_id) { - Some(r) => (Some(r.weight), r.master_id == peer.instance_id), + Some(r) => (Some(r.target_weight), r.master_id == peer.instance_id), None => (None, false), }; let replicaset = sharding.entry(peer.replicaset_uuid) diff --git a/test/int/test_basics.py b/test/int/test_basics.py index c7c6f39440..c835442503 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -225,7 +225,7 @@ def test_raft_log(instance: Instance): | 6 | 2 | |-| | 7 | 2 |1.1.1|PersistInstance(i1, 1, r1, Offline(0) -> Online(1), {b})| | 8 | 2 |1.1.2|PersistInstance(i1, 1, r1, RaftSynced(1) -> Online(1), {b})| -| 9 | 2 |1.1.3|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,0])| +| 9 | 2 |1.1.3|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,1.0,0])| | 10 | 2 |1.1.4|PersistInstance(i1, 1, r1, Replicated(1) -> Online(1), {b})| | 11 | 2 |1.1.5|PersistInstance(i1, 1, r1, ShardingInitialized(1) -> Online(1), {b})| | 12 | 2 |1.1.6|Replace(_picodata_property, ["vshard_bootstrapped",true])| -- GitLab