diff --git a/src/instance.rs b/src/instance.rs index dbd4df49c443c556a7f514f3b37aa0ac2c178943..ceebcc1d941e88522ca4d9d8a33288f9a5098edd 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -4,11 +4,11 @@ use self::grade::{CurrentGradeVariant, Grade, TargetGradeVariant}; use super::failure_domain::FailureDomain; use super::replicaset::ReplicasetId; +use crate::has_grades; use crate::rpc; use crate::storage::Clusterwide; use crate::traft::{instance_uuid, replicaset_uuid, RaftId}; use crate::util::{Transition, Uppercase}; -use crate::{has_grades, traft::node::Node}; use ::serde::{Deserialize, Serialize}; use ::tarantool::tlua; use ::tarantool::tuple::Encode; @@ -246,6 +246,20 @@ pub fn check_required_failure_domain( Err(format!("missing failure domain names: {}", res.join(", "))) } +pub fn replication_ids(replicaset_id: &ReplicasetId, storage: &Clusterwide) -> HashSet<RaftId> { + if let Some(replication_ids) = storage.cache().replicasets.get(replicaset_id) { + replication_ids + .iter() + .map(|id| { + let instance = storage.instances.get(id).expect("storage should not fail"); + instance.raft_id + }) + .collect() + } else { + HashSet::new() + } +} + impl std::fmt::Display for Instance { #[rustfmt::skip] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -264,17 +278,16 @@ impl std::fmt::Display for Instance { mod tests { use std::collections::HashSet; - use super::Topology; - use crate::failure_domain::FailureDomain; use crate::instance::grade::{CurrentGrade, Grade, TargetGrade, TargetGradeVariant}; use crate::replicaset::ReplicasetId; use crate::storage::Clusterwide; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; - use crate::traft::Instance; use crate::rpc; + use super::*; + trait IntoGrade<T> { fn into_grade(self) -> Grade<T>; } @@ -341,56 +354,27 @@ mod tests { }; } - macro_rules! build_instance { - ( - $topology:expr, - $instance_id:expr, - $replicaset_id:expr - $(, $failure_domain:expr )? - $(,)? - ) => { - { - let _f = FailureDomain::default(); - $(let _f = $failure_domain; )? - $topology.build_instance( - $instance_id.map(<&str>::into).as_ref(), - $replicaset_id.map(<&str>::into).as_ref(), - &_f - ) - } - }; - } - - macro_rules! set_grade { - ( - $topology:expr, - $instance_id:expr - $(, $current_grade:expr - $(, $target_grade:expr)?)? - $(,)? - ) => { - { - let req = rpc::update_instance::Request::new($instance_id.into(), "".into()); - $( - let req = $current_grade.modify(req); - $( let req = $target_grade.modify(req); )? - )? - $topology.build_updated_instance(&req) - } - }; + fn set_grade( + instance: Instance, + storage: &Clusterwide, + grade: impl ModifyUpdateInstanceRequest, + ) -> Result<Instance, String> { + let req = rpc::update_instance::Request::new(instance.instance_id.clone(), "".into()); + let req = grade.modify(req); + instance.update(&req, storage) } - macro_rules! set_faildoms { - ( - $topology:expr, - $instance_id:expr, - $failure_domain:expr $(,)? - ) => { - $topology.build_updated_instance( - &rpc::update_instance::Request::new($instance_id.into(), "".into()) - .with_failure_domain($failure_domain), - ) - }; + fn set_faildoms( + instance: Instance, + storage: &Clusterwide, + failure_domain: FailureDomain, + ) -> Result<Instance, String> { + let instance_id = instance.instance_id.clone(); + instance.update( + &rpc::update_instance::Request::new(instance_id, "".into()) + .with_failure_domain(failure_domain), + storage, + ) } macro_rules! faildoms { @@ -400,56 +384,55 @@ mod tests { } } - fn new_topology(storage: &Clusterwide, instances: Vec<Instance>, replication_factor: usize) -> Topology { + fn setup_storage(storage: &Clusterwide, instances: Vec<Instance>, replication_factor: usize) { for instance in instances { storage.instances.put(&instance).unwrap(); } storage.properties.put(crate::storage::PropertyName::ReplicationFactor, &replication_factor).unwrap(); - Topology::from(storage.clone()) } #[::tarantool::test] fn test_simple() { let storage = Clusterwide::new().unwrap(); - let mut topology = new_topology(&storage, vec![], 1); + setup_storage(&storage, vec![], 1); - let instance = build_instance!(topology, None, None).unwrap(); + let instance = Instance::new(None, None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); - let instance = build_instance!(topology, None, None).unwrap(); + let instance = Instance::new(None, None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); - let instance = build_instance!(topology, None, Some("R3")).unwrap(); + let instance = Instance::new(None, Some(&ReplicasetId::from("R3")), &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); - let instance = build_instance!(topology, Some("I4"), None).unwrap(); + let instance = Instance::new(Some(&InstanceId::from("I4")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); } #[::tarantool::test] fn test_override() { let storage = Clusterwide::new().unwrap(); - let topology = new_topology(&storage, instances![ + setup_storage(&storage, instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), (2, "i2", "r2-original", CurrentGrade::offline(0), TargetGrade::offline(0)), ], @@ -461,7 +444,7 @@ mod tests { // unreachable soon (when we implement failover) an the error // will be gone. assert_eq!( - build_instance!(topology, Some("i1"), None) + Instance::new(Some(&InstanceId::from("i1")), None, &FailureDomain::default(), &storage) .unwrap_err(), "i1 is already joined", ); @@ -477,13 +460,13 @@ mod tests { // - Even if it's an impostor, rely on auto-expel policy. // Disruption isn't destructive if auto-expel allows (TODO). assert_eq!( - build_instance!(topology, Some("i2"), None).unwrap(), + Instance::new(Some(&InstanceId::from("i2")), None, &FailureDomain::default(), &storage).unwrap(), (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0))), // Attention: generated replicaset_id differs from the // original one, as well as raft_id. // That's a desired behavior. ); - assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r1")), HashSet::from([1])); + assert_eq!(replication_ids(&ReplicasetId::from("r1"), &storage), HashSet::from([1])); // TODO // @@ -501,14 +484,14 @@ mod tests { #[::tarantool::test] fn test_instance_id_collision() { let storage = Clusterwide::new().unwrap(); - let topology = new_topology(&storage, instances![ + setup_storage(&storage, instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), (2, "i3", "r3", CurrentGrade::online(1), TargetGrade::online(1)), // Attention: i3 has raft_id=2 ], 1); assert_eq!( - build_instance!(topology, None, Some("r2")).unwrap(), + Instance::new(None, Some(&ReplicasetId::from("r2")), &FailureDomain::default(), &storage).unwrap(), instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); } @@ -516,114 +499,114 @@ mod tests { #[::tarantool::test] fn test_replication_factor() { let storage = Clusterwide::new().unwrap(); - let mut topology = new_topology(&storage, instances![ + setup_storage(&storage, instances![ (9, "i9", "r9", CurrentGrade::online(1), TargetGrade::online(1)), (10, "i10", "r9", CurrentGrade::online(1), TargetGrade::online(1)), ], 2); - let instance = build_instance!(topology, Some("i1"), None).unwrap(); + let instance = Instance::new(Some(&InstanceId::from("i1")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); - assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r1")), HashSet::from([11])); + storage.cache_mut().on_instance_change(instance, None); + assert_eq!(replication_ids(&ReplicasetId::from("r1"), &storage), HashSet::from([11])); - let instance = build_instance!(topology, Some("i2"), None).unwrap(); + let instance = Instance::new(Some(&InstanceId::from("i2")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); - assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r1")), HashSet::from([11, 12])); + storage.cache_mut().on_instance_change(instance, None); + assert_eq!(replication_ids(&ReplicasetId::from("r1"), &storage), HashSet::from([11, 12])); - let instance = build_instance!(topology, Some("i3"), None).unwrap(); + let instance = Instance::new(Some(&InstanceId::from("i3")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); - assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r2")), HashSet::from([13])); + storage.cache_mut().on_instance_change(instance, None); + assert_eq!(replication_ids(&ReplicasetId::from("r2"), &storage), HashSet::from([13])); - let instance = build_instance!(topology, Some("i4"), None).unwrap(); + let instance = Instance::new(Some(&InstanceId::from("i4")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); storage.instances.put(&instance).unwrap(); - topology.update(instance, None); - assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r2")), HashSet::from([13, 14])); + storage.cache_mut().on_instance_change(instance, None); + assert_eq!(replication_ids(&ReplicasetId::from("r2"), &storage), HashSet::from([13, 14])); } #[::tarantool::test] fn test_update_grade() { let storage = Clusterwide::new().unwrap(); let instance_v0 = instance!(1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)); - let mut topology = new_topology(&storage, vec![instance_v0.clone()], 1); + setup_storage(&storage, vec![instance_v0.clone()], 1); // Current grade incarnation is allowed to go down, // governor has the authority over it - let instance_v1 = set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(); + let instance_v1 = set_grade(instance_v0.clone(), &storage, CurrentGrade::offline(0)).unwrap(); storage.instances.put(&instance_v1).unwrap(); - topology.update(instance_v1.clone(), Some(instance_v0)); + storage.cache_mut().on_instance_change(instance_v1.clone(), Some(instance_v0)); assert_eq!( instance_v1, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // idempotency - let instance_v2 = set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(); + let instance_v2 = set_grade(instance_v1.clone(), &storage, CurrentGrade::offline(0)).unwrap(); storage.instances.put(&instance_v2).unwrap(); - topology.update(instance_v2.clone(), Some(instance_v1)); + storage.cache_mut().on_instance_change(instance_v2.clone(), Some(instance_v1)); assert_eq!( instance_v2, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // TargetGradeVariant::Offline takes incarnation from current grade - let instance_v3 = set_grade!(topology, "i1", TargetGradeVariant::Offline).unwrap(); + let instance_v3 = set_grade(instance_v2.clone(), &storage, TargetGradeVariant::Offline).unwrap(); storage.instances.put(&instance_v3).unwrap(); - topology.update(instance_v3.clone(), Some(instance_v2)); + storage.cache_mut().on_instance_change(instance_v3.clone(), Some(instance_v2)); assert_eq!( instance_v3, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); // TargetGradeVariant::Online increases incarnation - let instance_v4 = set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(); + let instance_v4 = set_grade(instance_v3.clone(), &storage, TargetGradeVariant::Online).unwrap(); storage.instances.put(&instance_v4).unwrap(); - topology.update(instance_v4.clone(), Some(instance_v3)); + storage.cache_mut().on_instance_change(instance_v4.clone(), Some(instance_v3)); assert_eq!( instance_v4, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // No idempotency, incarnation goes up - let instance_v5 = set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(); + let instance_v5 = set_grade(instance_v4.clone(), &storage, TargetGradeVariant::Online).unwrap(); storage.instances.put(&instance_v5).unwrap(); - topology.update(instance_v5.clone(), Some(instance_v4)); + storage.cache_mut().on_instance_change(instance_v5.clone(), Some(instance_v4)); assert_eq!( instance_v5, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(2)), ); // TargetGrade::Expelled takes incarnation from current grade - let instance_v6 = set_grade!(topology, "i1", TargetGradeVariant::Expelled).unwrap(); + let instance_v6 = set_grade(instance_v5.clone(), &storage, TargetGradeVariant::Expelled).unwrap(); storage.instances.put(&instance_v6).unwrap(); - topology.update(instance_v6.clone(), Some(instance_v5)); + storage.cache_mut().on_instance_change(instance_v6.clone(), Some(instance_v5)); assert_eq!( instance_v6, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::expelled(0)), ); // Instance get's expelled - let instance_v7 = set_grade!(topology, "i1", CurrentGrade::expelled(69)).unwrap(); + let instance_v7 = set_grade(instance_v6.clone(), &storage, CurrentGrade::expelled(69)).unwrap(); storage.instances.put(&instance_v7).unwrap(); - topology.update(instance_v7.clone(), Some(instance_v6)); + storage.cache_mut().on_instance_change(instance_v7.clone(), Some(instance_v6)); assert_eq!( instance_v7, instance!(1, "i1", "r1", CurrentGrade::expelled(69), TargetGrade::expelled(0)), @@ -631,7 +614,7 @@ mod tests { // Updating expelled instances isn't allowed assert_eq!( - set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap_err(), + set_grade(instance_v7, &storage, TargetGradeVariant::Online).unwrap_err(), "cannot update expelled instance \"i1\"", ); } @@ -639,72 +622,72 @@ mod tests { #[::tarantool::test] fn failure_domain() { let storage = Clusterwide::new().unwrap(); - let mut t = new_topology(&storage, vec![], 3); + setup_storage(&storage, vec![], 3); let instance = - build_instance!(t, None, None, faildoms! {planet: Earth}) + Instance::new(None, None, &faildoms! {planet: Earth}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r1"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); let instance = - build_instance!(t, None, None, faildoms! {planet: Earth}) + Instance::new(None, None, &faildoms! {planet: Earth}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r2"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); let instance = - build_instance!(t, None, None, faildoms! {planet: Mars}) + Instance::new(None, None, &faildoms! {planet: Mars}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r1"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); let instance = - build_instance!(t, None, None, faildoms! {planet: Earth, os: BSD}) + Instance::new(None, None, &faildoms! {planet: Earth, os: BSD}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r3"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); let instance = - build_instance!(t, None, None, faildoms! {planet: Mars, os: BSD}) + Instance::new(None, None, &faildoms! {planet: Mars, os: BSD}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r2"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); assert_eq!( - build_instance!(t, None, None, faildoms! {os: Arch}) + Instance::new(None, None, &faildoms! {os: Arch}, &storage) .unwrap_err(), "missing failure domain names: PLANET", ); let instance = - build_instance!(t, None, None, faildoms! {planet: Venus, os: Arch}) + Instance::new(None, None, &faildoms! {planet: Venus, os: Arch}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r1"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); let instance = - build_instance!(t, None, None, faildoms! {planet: Venus, os: Mac}) + Instance::new(None, None, &faildoms! {planet: Venus, os: Mac}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r2"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); let instance = - build_instance!(t, None, None, faildoms! {planet: Mars, os: Mac}) + Instance::new(None, None, &faildoms! {planet: Mars, os: Mac}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r3"); storage.instances.put(&instance).unwrap(); - t.update(instance, None); + storage.cache_mut().on_instance_change(instance, None); assert_eq!( - build_instance!(t, None, None, faildoms! {}) + Instance::new(None, None, &faildoms! {}, &storage) .unwrap_err(), "missing failure domain names: OS, PLANET", ); @@ -713,61 +696,61 @@ mod tests { #[::tarantool::test] fn reconfigure_failure_domain() { let storage = Clusterwide::new().unwrap(); - let mut t = new_topology(&storage, instances![], 3); + setup_storage(&storage, instances![], 3); // first instance - let instance1_v1 = build_instance!(t, Some("i1"), None, faildoms! {planet: Earth}).unwrap(); + let instance1_v1 = Instance::new(Some(&InstanceId::from("i1")), None, &faildoms! {planet: Earth}, &storage).unwrap(); storage.instances.put(&instance1_v1).unwrap(); - t.update(instance1_v1.clone(), None); + storage.cache_mut().on_instance_change(instance1_v1.clone(), None); assert_eq!(instance1_v1.failure_domain, faildoms! {planet: Earth}); assert_eq!(instance1_v1.replicaset_id, "r1"); // reconfigure single instance, fail assert_eq!( - set_faildoms!(t, "i1", faildoms! {owner: Ivan}) + set_faildoms(instance1_v1.clone(), &storage, faildoms! {owner: Ivan}) .unwrap_err(), "missing failure domain names: PLANET", ); // reconfigure single instance, success - let instance1_v2 = set_faildoms!(t, "i1", faildoms! {planet: Mars, owner: Ivan}).unwrap(); + let instance1_v2 = set_faildoms(instance1_v1.clone(), &storage, faildoms! {planet: Mars, owner: Ivan}).unwrap(); storage.instances.put(&instance1_v2).unwrap(); - t.update(instance1_v2.clone(), Some(instance1_v1)); + storage.cache_mut().on_instance_change(instance1_v2.clone(), Some(instance1_v1)); assert_eq!(instance1_v2.failure_domain, faildoms! {planet: Mars, owner: Ivan}); assert_eq!(instance1_v2.replicaset_id, "r1"); // same replicaset // second instance won't be joined without the newly added required // failure domain subdivision of "OWNER" assert_eq!( - build_instance!(t, Some("i2"), None, faildoms! {planet: Mars}) + Instance::new(Some(&InstanceId::from("i2")), None, &faildoms! {planet: Mars}, &storage) .unwrap_err(), "missing failure domain names: OWNER", ); // second instance #[rustfmt::skip] - let instance2_v1 = build_instance!(t, Some("i2"), None, faildoms! {planet: Mars, owner: Mike}) + let instance2_v1 = Instance::new(Some(&InstanceId::from("i2")), None, &faildoms! {planet: Mars, owner: Mike}, &storage) .unwrap(); storage.instances.put(&instance2_v1).unwrap(); - t.update(instance2_v1.clone(), None); + storage.cache_mut().on_instance_change(instance2_v1.clone(), None); assert_eq!(instance2_v1.failure_domain, faildoms! {planet: Mars, owner: Mike}); // doesn't fit into r1 assert_eq!(instance2_v1.replicaset_id, "r2"); // reconfigure second instance, success - let instance2_v2 = set_faildoms!(t, "i2", faildoms! {planet: Earth, owner: Mike}).unwrap(); + let instance2_v2 = set_faildoms(instance2_v1.clone(), &storage, faildoms! {planet: Earth, owner: Mike}).unwrap(); storage.instances.put(&instance2_v2).unwrap(); - t.update(instance2_v2.clone(), Some(instance2_v1)); + storage.cache_mut().on_instance_change(instance2_v2.clone(), Some(instance2_v1)); assert_eq!(instance2_v2.failure_domain, faildoms! {planet: Earth, owner: Mike}); // replicaset doesn't change automatically assert_eq!(instance2_v2.replicaset_id, "r2"); // add instance with new subdivision #[rustfmt::skip] - let instance3_v1 = build_instance!(t, Some("i3"), None, faildoms! {planet: B, owner: V, dimension: C137}) + let instance3_v1 = Instance::new(Some(&InstanceId::from("i3")), None, &faildoms! {planet: B, owner: V, dimension: C137}, &storage) .unwrap(); storage.instances.put(&instance3_v1).unwrap(); - t.update(instance3_v1.clone(), None); + storage.cache_mut().on_instance_change(instance3_v1.clone(), None); assert_eq!( instance3_v1.failure_domain, faildoms! {planet: B, owner: V, dimension: C137} @@ -779,7 +762,7 @@ mod tests { // subdivision #[rustfmt::skip] assert_eq!( - build_instance!(t, Some("i4"), None, faildoms! {planet: Theia, owner: Me}) + Instance::new(Some(&InstanceId::from("i4")), None, &faildoms! {planet: Theia, owner: Me}, &storage) .unwrap_err(), "missing failure domain names: DIMENSION", ); diff --git a/src/storage.rs b/src/storage.rs index 25b31fcf3e7c1f11772192775741b86e13cc3984..9b639f5b7af673355121196e06a9c2fb4e9fd875 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -2681,14 +2681,14 @@ pub fn tweak_max_space_id() -> tarantool::Result<()> { // cache //////////////////////////////////////////////////////////////////////////////// -static CACHE: OnceCell<Cache> = OnceCell::new(); +static mut CACHE: OnceCell<Cache> = OnceCell::new(); /// Information that can be derived from [`Clusterwide`] /// but is costly to recalculate. /// /// Should only be mutated, when storage is mutated. #[derive(Debug)] -struct Cache { +pub struct Cache { pub(crate) max_raft_id: RaftId, pub(crate) replicasets: BTreeMap<ReplicasetId, HashSet<crate::instance::InstanceId>>, pub(crate) failure_domain_names: HashSet<Uppercase>, @@ -2719,14 +2719,19 @@ impl Cache { impl Clusterwide { #[inline] pub fn cache(&self) -> &Cache { - CACHE.get_or_init(|| Cache::from(self)) + // SAFETY: this is safe as long as `CACHE` is only accessed from tx thread + unsafe { CACHE.get_or_init(|| Cache::from(self)) } } /// Cache should only be mutated, when storage is mutated. #[inline] + #[allow(clippy::mut_from_ref)] pub fn cache_mut(&self) -> &mut Cache { - CACHE.get_or_init(|| Cache::from(self)); - CACHE.get_mut().expect("just set") + // SAFETY: this is safe as long as `CACHE` is only accessed from tx thread + unsafe { + CACHE.get_or_init(|| Cache::from(self)); + CACHE.get_mut().expect("just set") + } } } diff --git a/src/traft/node.rs b/src/traft/node.rs index d207582480bd1d17a14851134f3fb4331ff8399a..b3c1846bb3455e851633996a3721c6fd120716d6 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -8,11 +8,11 @@ use crate::cas; use crate::governor; use crate::has_grades; +use crate::instance::replication_ids; use crate::instance::Instance; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; -use crate::replicaset::ReplicasetId; use crate::rpc; use crate::schema::{Distribution, IndexDef, SpaceDef}; use crate::storage::acl; @@ -158,6 +158,7 @@ impl std::fmt::Debug for Node { impl Node { /// Initialize the raft node. + /// /// **This function yields** pub fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> { let opts = WorkerOptions { @@ -323,24 +324,6 @@ impl Node { }) } - pub fn get_replication_ids(&self, replicaset_id: &ReplicasetId) -> HashSet<RaftId> { - if let Some(replication_ids) = self.storage.cache().replicasets.get(replicaset_id) { - replication_ids - .iter() - .map(|id| { - let instance = self - .storage - .instances - .get(id) - .expect("storage should not fail"); - instance.raft_id - }) - .collect() - } else { - HashSet::new() - } - } - /// Processes the [`rpc::join::Request`] and appends necessary /// entries to the raft log (if successful). /// @@ -365,7 +348,7 @@ impl Node { let mut replication_addresses = self .storage .peer_addresses - .addresses_by_ids(self.get_replication_ids(&instance.replicaset_id))?; + .addresses_by_ids(replication_ids(&instance.replicaset_id, &self.storage))?; replication_addresses.insert(req.advertise_address.clone()); let peer_address = traft::PeerAddress { raft_id: instance.raft_id,