diff --git a/src/failure_domain.rs b/src/failure_domain.rs index 92071c549a4f88f70be8059231be2fc0b95bb1a7..e777099bd7864354e0898abd27ac409c795e1cd4 100644 --- a/src/failure_domain.rs +++ b/src/failure_domain.rs @@ -102,6 +102,23 @@ impl FailureDomain { .filter(|&&key| self.data.get(key) != other.data.get(key)) .count() as u64 } + + /// Check that this failure domain contains all `required_domains`. + pub fn check(&self, required_domains: &HashSet<Uppercase>) -> Result<(), String> { + let mut res = Vec::new(); + for domain_name in required_domains { + if !self.contains_name(domain_name) { + res.push(domain_name.to_string()); + } + } + + if res.is_empty() { + return Ok(()); + } + + res.sort(); + Err(format!("missing failure domain names: {}", res.join(", "))) + } } impl std::fmt::Display for FailureDomain { diff --git a/src/instance.rs b/src/instance.rs index 1af21f476c45ca041fd4dd5815c23b5433b1eefd..8ca6e9819b2494308068cdfa9690fe1a5b103151 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,14 +1,8 @@ -use std::collections::HashSet; - -use self::grade::{CurrentGradeVariant, Grade, TargetGradeVariant}; - use super::failure_domain::FailureDomain; use super::replicaset::ReplicasetId; use crate::has_grades; -use crate::rpc; -use crate::storage::Clusterwide; use crate::traft::{instance_uuid, replicaset_uuid, RaftId}; -use crate::util::{Transition, Uppercase}; +use crate::util::Transition; use ::serde::{Deserialize, Serialize}; use ::tarantool::tlua; use ::tarantool::tuple::Encode; @@ -55,117 +49,34 @@ pub struct Instance { impl Encode for Instance {} impl Instance { - /// Constructs new `Instance` as part of the instance joining process. + /// Construct an instance. pub fn new( - instance_id: Option<&InstanceId>, - replicaset_id: Option<&ReplicasetId>, - failure_domain: &FailureDomain, - storage: &Clusterwide, - ) -> Result<Self, String> { - if let Some(id) = instance_id { - let existing_instance = storage.instances.get(id); - if matches!(existing_instance, Ok(instance) if has_grades!(instance, Online -> *)) { - let e = format!("{} is already joined", id); - return Err(e); - } - } - - check_required_failure_domain(failure_domain, &storage.cache().failure_domain_names)?; - - // Anyway, `join` always produces a new raft_id. - let raft_id = storage.cache().max_raft_id + 1; - let instance_id = instance_id - .map(Clone::clone) - .unwrap_or_else(|| choose_instance_id(raft_id, storage)); - let instance_uuid = instance_uuid(&instance_id); - let replicaset_id = replicaset_id - .map(Clone::clone) - .unwrap_or_else(|| choose_replicaset_id(failure_domain, storage)); - let replicaset_uuid = replicaset_uuid(&replicaset_id); - - let instance = Instance { - instance_id, - instance_uuid, - raft_id, - replicaset_id, - replicaset_uuid, - current_grade: CurrentGrade::offline(0), - target_grade: TargetGrade::offline(0), - failure_domain: failure_domain.clone(), - }; - - Ok(instance) - } - - /// Create first instance in the cluster. - pub fn initial( - instance_id: Option<InstanceId>, - replicaset_id: Option<ReplicasetId>, + raft_id: Option<RaftId>, + instance_id: Option<impl Into<InstanceId>>, + replicaset_id: Option<impl Into<ReplicasetId>>, + current_grade: CurrentGrade, + target_grade: TargetGrade, failure_domain: FailureDomain, ) -> Self { - let instance_id = instance_id.unwrap_or_else(|| "i1".into()); - let replicaset_id = replicaset_id.unwrap_or_else(|| ReplicasetId::from("r1")); + let instance_id = instance_id.map(Into::into).unwrap_or_else(|| "i1".into()); + let replicaset_id = replicaset_id + .map(Into::into) + .unwrap_or_else(|| ReplicasetId::from("r1")); + let raft_id = raft_id.unwrap_or(1); let instance_uuid = instance_uuid(&instance_id); let replicaset_uuid = replicaset_uuid(&replicaset_id); Self { instance_id, - raft_id: 1, + raft_id, replicaset_id, - current_grade: CurrentGrade::offline(0), - target_grade: TargetGrade::offline(0), + current_grade, + target_grade, failure_domain, instance_uuid, replicaset_uuid, } } - /// Updates existing [`Instance`]. Returns an updated instance. - pub fn update( - &self, - req: &rpc::update_instance::Request, - storage: &Clusterwide, - ) -> Result<Self, String> { - let mut instance = self.clone(); - if self.current_grade == CurrentGradeVariant::Expelled - && !matches!( - req, - rpc::update_instance::Request { - target_grade: None, - current_grade: Some(current_grade), - failure_domain: None, - .. - } if *current_grade == CurrentGradeVariant::Expelled - ) - { - return Err(format!( - "cannot update expelled instance \"{}\"", - self.instance_id - )); - } - - if let Some(fd) = req.failure_domain.as_ref() { - check_required_failure_domain(fd, &storage.cache().failure_domain_names)?; - instance.failure_domain = fd.clone(); - } - - if let Some(value) = req.current_grade { - instance.current_grade = value; - } - - if let Some(variant) = req.target_grade { - let incarnation = match variant { - TargetGradeVariant::Online => self.target_grade.incarnation + 1, - _ => self.current_grade.incarnation, - }; - instance.target_grade = Grade { - variant, - incarnation, - }; - } - - Ok(instance) - } - /// Instance has a grade that implies it may cooperate. /// Currently this means that target_grade is neither Offline nor Expelled. #[inline] @@ -180,92 +91,6 @@ impl Instance { } } -/// Choose [`InstanceId`] based on `raft_id`. -fn choose_instance_id(raft_id: RaftId, storage: &Clusterwide) -> InstanceId { - let mut suffix: Option<u64> = None; - loop { - let ret = match suffix { - None => format!("i{raft_id}"), - Some(x) => format!("i{raft_id}-{x}"), - } - .into(); - - if !storage - .instances - .contains(&ret) - .expect("storage should not fail") - { - return ret; - } - - suffix = Some(suffix.map_or(2, |x| x + 1)); - } -} - -/// Choose a [`ReplicasetId`] for a new instance given its `failure_domain`. -fn choose_replicaset_id(failure_domain: &FailureDomain, storage: &Clusterwide) -> ReplicasetId { - 'next_replicaset: for (replicaset_id, instances) in storage.cache().replicasets.iter() { - let replication_factor = storage - .properties - .replication_factor() - .expect("storage should not fail"); - - if instances.len() < replication_factor { - for instance_id in instances { - let instance = storage.instances.get(instance_id).unwrap(); - if instance.failure_domain.intersects(failure_domain) { - continue 'next_replicaset; - } - } - return replicaset_id.clone(); - } - } - - let mut i = 0u64; - loop { - i += 1; - let replicaset_id = ReplicasetId(format!("r{i}")); - if storage.cache().replicasets.get(&replicaset_id).is_none() { - return replicaset_id; - } - } -} - -/// Check that `fd` contains all `required_domains`. -pub fn check_required_failure_domain( - fd: &FailureDomain, - required_domains: &HashSet<Uppercase>, -) -> Result<(), String> { - let mut res = Vec::new(); - for domain_name in required_domains { - if !fd.contains_name(domain_name) { - res.push(domain_name.to_string()); - } - } - - if res.is_empty() { - return Ok(()); - } - - res.sort(); - Err(format!("missing failure domain names: {}", res.join(", "))) -} - -/// Get ids of instances that are part of a replicaset with `replicaset_id`. -pub fn replication_ids(replicaset_id: &ReplicasetId, storage: &Clusterwide) -> HashSet<RaftId> { - if let Some(replication_ids) = storage.cache().replicasets.get(replicaset_id) { - replication_ids - .iter() - .map(|id| { - let instance = storage.instances.get(id).expect("storage should not fail"); - instance.raft_id - }) - .collect() - } else { - HashSet::new() - } -} - impl std::fmt::Display for Instance { #[rustfmt::skip] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -287,10 +112,10 @@ mod tests { use crate::failure_domain::FailureDomain; use crate::instance::grade::{CurrentGrade, Grade, TargetGrade, TargetGradeVariant}; use crate::replicaset::ReplicasetId; + use crate::rpc::join::{build_instance, replication_ids}; use crate::storage::Clusterwide; - use crate::traft::instance_uuid; - use crate::traft::replicaset_uuid; use crate::rpc; + use crate::rpc::update_instance::update_instance; use super::*; @@ -326,61 +151,30 @@ mod tests { } } - macro_rules! instances { - [ $( ( $($instance:tt)+ ) ),* $(,)? ] => { - vec![$( instance!($($instance)+) ),*] - }; - } - - macro_rules! instance { - ( - $raft_id:expr, - $instance_id:literal, - $replicaset_id:literal, - $current_grade:expr, - $target_grade:expr - $(, $failure_domain:expr)? - $(,)? - ) => { - Instance { - raft_id: $raft_id, - instance_id: $instance_id.into(), - replicaset_id: $replicaset_id.into(), - instance_uuid: instance_uuid($instance_id), - replicaset_uuid: replicaset_uuid($replicaset_id), - - current_grade: $current_grade.into_grade(), - target_grade: $target_grade.into_grade(), - failure_domain: { - let _f = FailureDomain::default(); - $( let _f = $failure_domain; )? - _f - }, - } - }; - } - fn set_grade( - instance: Instance, + mut instance: Instance, storage: &Clusterwide, grade: impl ModifyUpdateInstanceRequest, ) -> Result<Instance, String> { let req = rpc::update_instance::Request::new(instance.instance_id.clone(), "".into()); let req = grade.modify(req); - instance.update(&req, storage) + update_instance(&mut instance, &req, storage)?; + Ok(instance) } fn set_faildoms( - instance: Instance, + mut instance: Instance, storage: &Clusterwide, failure_domain: FailureDomain, ) -> Result<Instance, String> { let instance_id = instance.instance_id.clone(); - instance.update( + update_instance( + &mut instance, &rpc::update_instance::Request::new(instance_id, "".into()) .with_failure_domain(failure_domain), storage, - ) + )?; + Ok(instance) } macro_rules! faildoms { @@ -402,34 +196,34 @@ mod tests { let storage = Clusterwide::new().unwrap(); setup_storage(&storage, vec![], 1); - let instance = Instance::new(None, None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(None, None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); - let instance = Instance::new(None, None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(None, None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(2), Some("i2"), Some("r2"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); - let instance = Instance::new(None, Some(&ReplicasetId::from("R3")), &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(None, Some(&ReplicasetId::from("R3")), &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(3), Some("i3"), Some("R3"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); - let instance = Instance::new(Some(&InstanceId::from("I4")), None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(Some(&InstanceId::from("I4")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(4), Some("I4"), Some("r3"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); @@ -438,9 +232,9 @@ mod tests { #[::tarantool::test] fn test_override() { let storage = Clusterwide::new().unwrap(); - setup_storage(&storage, instances![ - (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), - (2, "i2", "r2-original", CurrentGrade::offline(0), TargetGrade::offline(0)), + setup_storage(&storage, vec![ + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::online(1), TargetGrade::online(1), FailureDomain::default()), + Instance::new(Some(2), Some("i2"), Some("r2-original"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ], 2); @@ -450,7 +244,7 @@ mod tests { // unreachable soon (when we implement failover) an the error // will be gone. assert_eq!( - Instance::new(Some(&InstanceId::from("i1")), None, &FailureDomain::default(), &storage) + build_instance(Some(&InstanceId::from("i1")), None, &FailureDomain::default(), &storage) .unwrap_err(), "i1 is already joined", ); @@ -466,8 +260,8 @@ mod tests { // - Even if it's an impostor, rely on auto-expel policy. // Disruption isn't destructive if auto-expel allows (TODO). assert_eq!( - Instance::new(Some(&InstanceId::from("i2")), None, &FailureDomain::default(), &storage).unwrap(), - (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0))), + build_instance(Some(&InstanceId::from("i2")), None, &FailureDomain::default(), &storage).unwrap(), + (Instance::new(Some(3), Some("i2"), Some("r1"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default())), // Attention: generated replicaset_id differs from the // original one, as well as raft_id. // That's a desired behavior. @@ -490,58 +284,58 @@ mod tests { #[::tarantool::test] fn test_instance_id_collision() { let storage = Clusterwide::new().unwrap(); - setup_storage(&storage, instances![ - (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), - (2, "i3", "r3", CurrentGrade::online(1), TargetGrade::online(1)), + setup_storage(&storage, vec![ + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::online(1), TargetGrade::online(1), FailureDomain::default()), + Instance::new(Some(2), Some("i3"), Some("r3"), CurrentGrade::online(1), TargetGrade::online(1), FailureDomain::default()), // Attention: i3 has raft_id=2 ], 1); assert_eq!( - Instance::new(None, Some(&ReplicasetId::from("r2")), &FailureDomain::default(), &storage).unwrap(), - instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), + build_instance(None, Some(&ReplicasetId::from("r2")), &FailureDomain::default(), &storage).unwrap(), + Instance::new(Some(3), Some("i3-2"), Some("r2"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); } #[::tarantool::test] fn test_replication_factor() { let storage = Clusterwide::new().unwrap(); - setup_storage(&storage, instances![ - (9, "i9", "r9", CurrentGrade::online(1), TargetGrade::online(1)), - (10, "i10", "r9", CurrentGrade::online(1), TargetGrade::online(1)), + setup_storage(&storage, vec![ + Instance::new(Some(9), Some("i9"), Some("r9"), CurrentGrade::online(1), TargetGrade::online(1), FailureDomain::default()), + Instance::new(Some(10), Some("i10"), Some("r9"), CurrentGrade::online(1), TargetGrade::online(1), FailureDomain::default()), ], 2); - let instance = Instance::new(Some(&InstanceId::from("i1")), None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(Some(&InstanceId::from("i1")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(11), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); assert_eq!(replication_ids(&ReplicasetId::from("r1"), &storage), HashSet::from([11])); - let instance = Instance::new(Some(&InstanceId::from("i2")), None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(Some(&InstanceId::from("i2")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(12), Some("i2"), Some("r1"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); assert_eq!(replication_ids(&ReplicasetId::from("r1"), &storage), HashSet::from([11, 12])); - let instance = Instance::new(Some(&InstanceId::from("i3")), None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(Some(&InstanceId::from("i3")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(13), Some("i3"), Some("r2"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); assert_eq!(replication_ids(&ReplicasetId::from("r2"), &storage), HashSet::from([13])); - let instance = Instance::new(Some(&InstanceId::from("i4")), None, &FailureDomain::default(), &storage).unwrap(); + let instance = build_instance(Some(&InstanceId::from("i4")), None, &FailureDomain::default(), &storage).unwrap(); assert_eq!( instance, - instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(14), Some("i4"), Some("r2"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); @@ -551,7 +345,7 @@ mod tests { #[::tarantool::test] fn test_update_grade() { let storage = Clusterwide::new().unwrap(); - let instance_v0 = instance!(1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)); + let instance_v0 = Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::online(1), TargetGrade::online(1), FailureDomain::default()); setup_storage(&storage, vec![instance_v0.clone()], 1); // Current grade incarnation is allowed to go down, @@ -561,7 +355,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v1.clone(), Some(instance_v0)); assert_eq!( instance_v1, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::online(1), FailureDomain::default()) ); // idempotency @@ -570,7 +364,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v2.clone(), Some(instance_v1)); assert_eq!( instance_v2, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::online(1), FailureDomain::default()) ); // TargetGradeVariant::Offline takes incarnation from current grade @@ -579,7 +373,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v3.clone(), Some(instance_v2)); assert_eq!( instance_v3, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::offline(0), FailureDomain::default()), ); // TargetGradeVariant::Online increases incarnation @@ -588,7 +382,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v4.clone(), Some(instance_v3)); assert_eq!( instance_v4, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::online(1), FailureDomain::default()) ); // No idempotency, incarnation goes up @@ -597,7 +391,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v5.clone(), Some(instance_v4)); assert_eq!( instance_v5, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(2)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::online(2), FailureDomain::default()) ); // TargetGrade::Expelled takes incarnation from current grade @@ -606,7 +400,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v6.clone(), Some(instance_v5)); assert_eq!( instance_v6, - instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::expelled(0)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::offline(0), TargetGrade::expelled(0), FailureDomain::default()), ); // Instance get's expelled @@ -615,7 +409,7 @@ mod tests { storage.cache_mut().on_instance_change(instance_v7.clone(), Some(instance_v6)); assert_eq!( instance_v7, - instance!(1, "i1", "r1", CurrentGrade::expelled(69), TargetGrade::expelled(0)), + Instance::new(Some(1), Some("i1"), Some("r1"), CurrentGrade::expelled(69), TargetGrade::expelled(0), FailureDomain::default()), ); // Updating expelled instances isn't allowed @@ -631,69 +425,69 @@ mod tests { setup_storage(&storage, vec![], 3); let instance = - Instance::new(None, None, &faildoms! {planet: Earth}, &storage) + build_instance(None, None, &faildoms! {planet: Earth}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r1"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); let instance = - Instance::new(None, None, &faildoms! {planet: Earth}, &storage) + build_instance(None, None, &faildoms! {planet: Earth}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r2"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); let instance = - Instance::new(None, None, &faildoms! {planet: Mars}, &storage) + build_instance(None, None, &faildoms! {planet: Mars}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r1"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); let instance = - Instance::new(None, None, &faildoms! {planet: Earth, os: BSD}, &storage) + build_instance(None, None, &faildoms! {planet: Earth, os: BSD}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r3"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); let instance = - Instance::new(None, None, &faildoms! {planet: Mars, os: BSD}, &storage) + build_instance(None, None, &faildoms! {planet: Mars, os: BSD}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r2"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); assert_eq!( - Instance::new(None, None, &faildoms! {os: Arch}, &storage) + build_instance(None, None, &faildoms! {os: Arch}, &storage) .unwrap_err(), "missing failure domain names: PLANET", ); let instance = - Instance::new(None, None, &faildoms! {planet: Venus, os: Arch}, &storage) + build_instance(None, None, &faildoms! {planet: Venus, os: Arch}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r1"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); let instance = - Instance::new(None, None, &faildoms! {planet: Venus, os: Mac}, &storage) + build_instance(None, None, &faildoms! {planet: Venus, os: Mac}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r2"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); let instance = - Instance::new(None, None, &faildoms! {planet: Mars, os: Mac}, &storage) + build_instance(None, None, &faildoms! {planet: Mars, os: Mac}, &storage) .unwrap(); assert_eq!(instance.replicaset_id, "r3"); storage.instances.put(&instance).unwrap(); storage.cache_mut().on_instance_change(instance, None); assert_eq!( - Instance::new(None, None, &faildoms! {}, &storage) + build_instance(None, None, &faildoms! {}, &storage) .unwrap_err(), "missing failure domain names: OS, PLANET", ); @@ -702,10 +496,10 @@ mod tests { #[::tarantool::test] fn reconfigure_failure_domain() { let storage = Clusterwide::new().unwrap(); - setup_storage(&storage, instances![], 3); + setup_storage(&storage, vec![], 3); // first instance - let instance1_v1 = Instance::new(Some(&InstanceId::from("i1")), None, &faildoms! {planet: Earth}, &storage).unwrap(); + let instance1_v1 = build_instance(Some(&InstanceId::from("i1")), None, &faildoms! {planet: Earth}, &storage).unwrap(); storage.instances.put(&instance1_v1).unwrap(); storage.cache_mut().on_instance_change(instance1_v1.clone(), None); assert_eq!(instance1_v1.failure_domain, faildoms! {planet: Earth}); @@ -728,14 +522,14 @@ mod tests { // second instance won't be joined without the newly added required // failure domain subdivision of "OWNER" assert_eq!( - Instance::new(Some(&InstanceId::from("i2")), None, &faildoms! {planet: Mars}, &storage) + build_instance(Some(&InstanceId::from("i2")), None, &faildoms! {planet: Mars}, &storage) .unwrap_err(), "missing failure domain names: OWNER", ); // second instance #[rustfmt::skip] - let instance2_v1 = Instance::new(Some(&InstanceId::from("i2")), None, &faildoms! {planet: Mars, owner: Mike}, &storage) + let instance2_v1 = build_instance(Some(&InstanceId::from("i2")), None, &faildoms! {planet: Mars, owner: Mike}, &storage) .unwrap(); storage.instances.put(&instance2_v1).unwrap(); storage.cache_mut().on_instance_change(instance2_v1.clone(), None); @@ -753,7 +547,7 @@ mod tests { // add instance with new subdivision #[rustfmt::skip] - let instance3_v1 = Instance::new(Some(&InstanceId::from("i3")), None, &faildoms! {planet: B, owner: V, dimension: C137}, &storage) + let instance3_v1 = build_instance(Some(&InstanceId::from("i3")), None, &faildoms! {planet: B, owner: V, dimension: C137}, &storage) .unwrap(); storage.instances.put(&instance3_v1).unwrap(); storage.cache_mut().on_instance_change(instance3_v1.clone(), None); @@ -768,7 +562,7 @@ mod tests { // subdivision #[rustfmt::skip] assert_eq!( - Instance::new(Some(&InstanceId::from("i4")), None, &faildoms! {planet: Theia, owner: Me}, &storage) + build_instance(Some(&InstanceId::from("i4")), None, &faildoms! {planet: Theia, owner: Me}, &storage) .unwrap_err(), "missing failure domain names: DIMENSION", ); diff --git a/src/lib.rs b/src/lib.rs index 51405c105a959112ea1fb31d956bf9bcc7e35a68..4c3fc67016a61633193d1ccac96fe1fc0e388ee9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ use traft::RaftSpaceAccess; use protobuf::Message as _; use crate::args::Address; -use crate::instance::grade::TargetGradeVariant; +use crate::instance::grade::{CurrentGrade, TargetGrade, TargetGradeVariant}; use crate::instance::Instance; use crate::traft::op; use crate::traft::LogicalClock; @@ -355,9 +355,12 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { fn start_boot(args: &args::Run) { tlog!(Info, ">>>>> start_boot()"); - let instance = Instance::initial( + let instance = Instance::new( + None, args.instance_id.clone(), args.replicaset_id.clone(), + CurrentGrade::offline(0), + TargetGrade::offline(0), args.failure_domain(), ); let raft_id = instance.raft_id; diff --git a/src/rpc/join.rs b/src/rpc/join.rs index 077a8aaa4891eb26d501496a14a102d844ec67e6..1f2876d1a8b5a1ca9572505e19ac055787366a1e 100644 --- a/src/rpc/join.rs +++ b/src/rpc/join.rs @@ -1,14 +1,16 @@ +use std::collections::HashSet; use std::time::Duration; use crate::cas; use crate::failure_domain::FailureDomain; -use crate::instance::replication_ids; +use crate::has_grades; +use crate::instance::grade::{CurrentGrade, TargetGrade}; use crate::instance::{Instance, InstanceId}; use crate::replicaset::ReplicasetId; -use crate::storage::ToEntryIter as _; +use crate::storage::{Clusterwide, ToEntryIter as _}; use crate::storage::{ClusterwideSpaceId, PropertyName}; -use crate::traft; use crate::traft::op::{Dml, Op}; +use crate::traft::{self, RaftId}; use crate::traft::{error::Error, node, Address, PeerAddress, Result}; use ::tarantool::fiber; @@ -74,7 +76,7 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R let deadline = fiber::clock().saturating_add(timeout); loop { - let instance = Instance::new( + let instance = build_instance( req.instance_id.as_ref(), req.replicaset_id.as_ref(), &req.failure_domain, @@ -152,3 +154,105 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R }); } } + +pub fn build_instance( + instance_id: Option<&InstanceId>, + replicaset_id: Option<&ReplicasetId>, + failure_domain: &FailureDomain, + storage: &Clusterwide, +) -> std::result::Result<Instance, String> { + if let Some(id) = instance_id { + let existing_instance = storage.instances.get(id); + if matches!(existing_instance, Ok(instance) if has_grades!(instance, Online -> *)) { + let e = format!("{} is already joined", id); + return Err(e); + } + } + + failure_domain.check(&storage.cache().failure_domain_names)?; + + // Anyway, `join` always produces a new raft_id. + let raft_id = storage.cache().max_raft_id + 1; + let instance_id = instance_id + .map(Clone::clone) + .unwrap_or_else(|| choose_instance_id(raft_id, storage)); + let replicaset_id = replicaset_id + .map(Clone::clone) + .unwrap_or_else(|| choose_replicaset_id(failure_domain, storage)); + + let instance = Instance::new( + Some(raft_id), + Some(instance_id), + Some(replicaset_id), + CurrentGrade::offline(0), + TargetGrade::offline(0), + failure_domain.clone(), + ); + Ok(instance) +} + +/// Choose [`InstanceId`] based on `raft_id`. +fn choose_instance_id(raft_id: RaftId, storage: &Clusterwide) -> InstanceId { + let mut suffix: Option<u64> = None; + loop { + let ret = match suffix { + None => format!("i{raft_id}"), + Some(x) => format!("i{raft_id}-{x}"), + } + .into(); + + if !storage + .instances + .contains(&ret) + .expect("storage should not fail") + { + return ret; + } + + suffix = Some(suffix.map_or(2, |x| x + 1)); + } +} + +/// Choose a [`ReplicasetId`] for a new instance given its `failure_domain`. +fn choose_replicaset_id(failure_domain: &FailureDomain, storage: &Clusterwide) -> ReplicasetId { + 'next_replicaset: for (replicaset_id, instances) in storage.cache().replicasets.iter() { + let replication_factor = storage + .properties + .replication_factor() + .expect("storage should not fail"); + + if instances.len() < replication_factor { + for instance_id in instances { + let instance = storage.instances.get(instance_id).unwrap(); + if instance.failure_domain.intersects(failure_domain) { + continue 'next_replicaset; + } + } + return replicaset_id.clone(); + } + } + + let mut i = 0u64; + loop { + i += 1; + let replicaset_id = ReplicasetId(format!("r{i}")); + if storage.cache().replicasets.get(&replicaset_id).is_none() { + return replicaset_id; + } + } +} + +/// Get ids of instances that are part of a replicaset with `replicaset_id`. +pub fn replication_ids(replicaset_id: &ReplicasetId, storage: &Clusterwide) -> HashSet<RaftId> { + if let Some(replication_ids) = storage.cache().replicasets.get(replicaset_id) { + replication_ids + .iter() + .map(|id| { + let instance = storage.instances.get(id).expect("storage should not fail"); + instance.raft_id + }) + .collect() + } else { + HashSet::new() + } +} diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs index 6e9226b327034ae18297a8f0ead178e400a04aef..d109f450cf80b25edf5f29246736aaf2e8ef3286 100644 --- a/src/rpc/update_instance.rs +++ b/src/rpc/update_instance.rs @@ -2,9 +2,9 @@ use std::time::Duration; use crate::cas; use crate::failure_domain::FailureDomain; -use crate::instance::grade::{CurrentGrade, TargetGradeVariant}; -use crate::instance::InstanceId; -use crate::storage::{ClusterwideSpaceId, PropertyName}; +use crate::instance::grade::{CurrentGrade, CurrentGradeVariant, Grade, TargetGradeVariant}; +use crate::instance::{Instance, InstanceId}; +use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; use crate::traft::op::{Dml, Op}; use crate::traft::Result; use crate::traft::{error::Error, node}; @@ -97,11 +97,8 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) let deadline = fiber::clock().saturating_add(timeout); loop { - let instance = storage - .instances - .get(&req.instance_id)? - .update(&req, storage) - .map_err(raft::Error::ConfChangeError)?; + let mut instance = storage.instances.get(&req.instance_id)?; + update_instance(&mut instance, &req, storage).map_err(raft::Error::ConfChangeError)?; let dml = Dml::replace(ClusterwideSpaceId::Instance, &instance) .expect("encoding should not fail"); @@ -142,3 +139,49 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) return Ok(()); } } + +/// Updates existing [`Instance`]. +pub fn update_instance( + instance: &mut Instance, + req: &Request, + storage: &Clusterwide, +) -> std::result::Result<(), String> { + if instance.current_grade == CurrentGradeVariant::Expelled + && !matches!( + req, + Request { + target_grade: None, + current_grade: Some(current_grade), + failure_domain: None, + .. + } if *current_grade == CurrentGradeVariant::Expelled + ) + { + return Err(format!( + "cannot update expelled instance \"{}\"", + instance.instance_id + )); + } + + if let Some(fd) = req.failure_domain.as_ref() { + fd.check(&storage.cache().failure_domain_names)?; + instance.failure_domain = fd.clone(); + } + + if let Some(value) = req.current_grade { + instance.current_grade = value; + } + + if let Some(variant) = req.target_grade { + let incarnation = match variant { + TargetGradeVariant::Online => instance.target_grade.incarnation + 1, + _ => instance.current_grade.incarnation, + }; + instance.target_grade = Grade { + variant, + incarnation, + }; + } + + Ok(()) +}