diff --git a/src/failure_domain.rs b/src/failure_domain.rs index 85e9dfb645fbbd2ea8b2cdac9bb90859f88cc527..1b1dbd36eda16e112d79c62a55f500f307dac708 100644 --- a/src/failure_domain.rs +++ b/src/failure_domain.rs @@ -1,6 +1,7 @@ use crate::stringify_debug; +use crate::traft::Distance; use crate::util::Uppercase; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; //////////////////////////////////////////////////////////////////////////////// /// Failure domains of a given instance. @@ -32,6 +33,17 @@ impl FailureDomain { } false } + + /// Calculate distance between two `FailureDomain`. + /// `Distance` is property of metric space implicitly set by `FailureDomain` keys (axis) and values + pub fn distance(&self, other: &Self) -> Distance { + let mut keys: HashSet<&Uppercase> = HashSet::new(); + keys.extend(self.names()); + keys.extend(other.names()); + keys.iter() + .filter(|&&key| self.data.get(key) != other.data.get(key)) + .count() as u64 + } } impl std::fmt::Display for FailureDomain { diff --git a/src/governor/cc.rs b/src/governor/cc.rs index eb4dcd7b3adf347717205af248ae4026821455cc..20da77e9044e3e0153f809d7a625f30147395a4f 100644 --- a/src/governor/cc.rs +++ b/src/governor/cc.rs @@ -1,13 +1,12 @@ use ::raft::prelude as raft; use ::raft::prelude::ConfChangeType::*; -use std::collections::BTreeMap; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use crate::has_grades; use crate::instance::grade::TargetGradeVariant; use crate::instance::Instance; -use crate::traft::RaftId; +use crate::traft::{Distance, RaftId}; struct RaftConf<'a> { all: BTreeMap<RaftId, &'a Instance>, @@ -47,6 +46,28 @@ impl<'a> RaftConf<'a> { } } +/// Sum of failure domain distances between `a` and each of `bs` +fn sum_distance(all: &BTreeMap<RaftId, &Instance>, a: &RaftId, bs: &BTreeSet<RaftId>) -> Distance { + let Some(a) = all.get(a) else { return 0 }; + bs.iter() + .filter_map(|raft_id| all.get(raft_id)) + .map(|b| b.failure_domain.distance(&a.failure_domain)) + .sum() +} + +/// From `candidates` find one with maximum total distance to `voters` +fn find_farthest( + all: &BTreeMap<RaftId, &Instance>, + voters: &BTreeSet<RaftId>, + candidates: &BTreeSet<RaftId>, +) -> Option<(RaftId, Distance)> { + candidates + .iter() + .filter(|&&raft_id| !voters.contains(&raft_id)) + .map(|&raft_id| (raft_id, sum_distance(all, &raft_id, voters))) + .reduce(|acc, item| if item.1 > acc.1 { item } else { acc }) +} + pub(crate) fn raft_conf_change( instances: &[Instance], voters: &[RaftId], @@ -74,6 +95,7 @@ pub(crate) fn raft_conf_change( }; // Remove / replace voters + let mut next_voters: BTreeSet<&RaftId> = BTreeSet::new(); for voter_id in raft_conf.voters.clone().iter() { let Some(instance) = raft_conf.all.get(voter_id) else { // Nearly impossible, but rust forces me to check it. @@ -88,14 +110,14 @@ pub(crate) fn raft_conf_change( TargetGradeVariant::Offline => { // A voter goes offline. Replace it with // another online instance if possible. - let Some(replacement) = instances.iter().find(|instance| { + let Some(next_voter) = instances.iter().find(|instance| { has_grades!(instance, Online -> Online) && !raft_conf.voters.contains(&instance.raft_id) + && !next_voters.contains(&instance.raft_id) }) else { continue }; - - let ccs1 = raft_conf.change_single(AddLearnerNode, instance.raft_id); - let ccs2 = raft_conf.change_single(AddNode, replacement.raft_id); - changes.extend_from_slice(&[ccs1, ccs2]); + next_voters.insert(&next_voter.raft_id); + let ccs = raft_conf.change_single(AddLearnerNode, instance.raft_id); + changes.push(ccs); } TargetGradeVariant::Expelled => { // Expelled instance is removed unconditionally. @@ -132,18 +154,40 @@ pub(crate) fn raft_conf_change( } } + let remembered_voters = raft_conf.voters.clone(); + // Promote more voters - for instance in instances + let candidates: BTreeSet<_> = instances .iter() .filter(|instance| has_grades!(instance, Online -> Online)) - { - if raft_conf.voters.len() >= voters_needed { + .map(|p| p.raft_id) + .collect(); + while raft_conf.voters.len() < voters_needed { + if let Some((new_voter_id, _)) = + find_farthest(&raft_conf.all, &raft_conf.voters, &candidates) + { + let ccs = raft_conf.change_single(AddNode, new_voter_id); + changes.push(ccs); + } else { break; } + } - if !raft_conf.voters.contains(&instance.raft_id) { - let ccs = raft_conf.change_single(AddNode, instance.raft_id); - changes.push(ccs); + // Redistributing existing voters according to failure domains + for voter_id in remembered_voters { + let mut other_voters = raft_conf.voters.clone(); + other_voters.remove(&voter_id); + if let Some((new_voter_id, new_distance)) = + find_farthest(&raft_conf.all, &other_voters, &candidates) + { + if new_distance > sum_distance(&raft_conf.all, &voter_id, &other_voters) { + let ccs1 = raft_conf.change_single(AddLearnerNode, voter_id); + let ccs2 = raft_conf.change_single(AddNode, new_voter_id); + changes.push(ccs1); + changes.push(ccs2); + } + } else { + break; } } @@ -175,15 +219,24 @@ mod tests { use super::*; use ::raft::prelude as raft; + use std::collections::{BTreeMap, BTreeSet}; + use crate::failure_domain::FailureDomain; use crate::instance::grade::{CurrentGradeVariant, Grade}; use crate::traft::RaftId; + macro_rules! fd { + ($(,)?) => { FailureDomain::default() }; + ($($k:tt : $v:tt),+ $(,)?) => { + FailureDomain::from([$((stringify!($k), stringify!($v))),+]) + } + } + macro_rules! p { ( $raft_id:literal, - $current_grade:ident -> - $target_grade:ident + $current_grade:ident -> $target_grade:ident + $(, $failure_domain:expr)? ) => { Instance { raft_id: $raft_id, @@ -197,6 +250,7 @@ mod tests { // raft_conf_change doesn't care about incarnations incarnation: 0, }, + $(failure_domain: $failure_domain,)? ..Instance::default() } }; @@ -204,8 +258,10 @@ mod tests { ( $raft_id:literal, $grade:ident + $(, $failure_domain:expr)? + ) => { - p!($raft_id, $grade -> $grade) + p!($raft_id, $grade -> $grade $(, $failure_domain)?) }; } @@ -234,7 +290,59 @@ mod tests { } #[test] - fn conf_change() { + fn test_sum_distance() { + assert_eq!( + 3, + sum_distance( + &BTreeMap::from([ + (1, &p!(1, Online, fd! {x: A, y: B})), + (2, &p!(2, Online, fd! {x: A, y: C})), + (3, &p!(3, Online, fd! {x: D, y: C})) + ]), + &1, + &BTreeSet::from([2, 3]) + ) + ); + assert_eq!( + 0, + sum_distance( + &BTreeMap::from([(1, &p!(1, Online, fd! {x: A, y: B}))]), + &1, + &BTreeSet::new() + ) + ); + } + + #[test] + fn test_find_farthest() { + let instances = [ + p!(1, Online, fd! {dc: Msk, srv: Msk1}), + p!(2, Online, fd! {dc: Msk, srv: Msk2}), + p!(3, Online, fd! {dc: Msk, srv: Msk3}), + p!(4, Online, fd! {dc: Spb, srv: Spb1}), + p!(5, Online, fd! {dc: Spb, srv: Spb2}), + p!(6, Online, fd! {dc: Spb, srv: Spb3}), + p!(7, Online, fd! {dc: Arb, srv: Arb1}), + ]; + let map: BTreeMap<RaftId, &Instance> = instances.iter().map(|p| (p.raft_id, p)).collect(); + let (found_raft_id, found_distance) = find_farthest( + &map, + &BTreeSet::from([7, 1]), + &BTreeSet::from([2, 3, 4, 5, 6]), + ) + .unwrap(); + let expected = [4, 5, 6]; + assert!(BTreeSet::<RaftId>::from(expected).contains(&found_raft_id)); + assert_eq!(4, found_distance); + + assert_eq!( + None, + find_farthest(&map, &BTreeSet::from([7, 1]), &BTreeSet::from([])) + ); + } + + #[test] + fn test_conf_change() { let p1 = || p!(1, Online); let p2 = || p!(2, Online); let p3 = || p!(3, Online); @@ -367,5 +475,75 @@ mod tests { // Voter p2 goes offline, but there's no replacement. None ); + + assert_eq!( + cc( + &[ + p!(1, Online, fd! {dc: Arb}), + p!(2, Online, fd! {dc: Msk}), + p!(3, Online, fd! {dc: Msk}), + p!(4, Online, fd! {dc: Msk}), + p!(5, Online, fd! {dc: Spb}), + p!(6, Offline, fd! {dc: Spb}), + p!(7, Online, fd! {dc: Spb}), + ], + &[1, 2, 3, 5], + &[4, 7] + ), + // New voter should respect failure domain + cc![AddLearnerNode(6), AddNode(7)] + ); + + assert_eq!( + cc( + &[ + p!(1, Online, fd! {dc: Arb}), + p!(2, Online, fd! {dc: Msk}), + p!(3, Online, fd! {dc: Msk}), + p!(4, Online, fd! {dc: Msk}), + p!(5, Online, fd! {dc: Msk}), + p!(6, Online, fd! {dc: Spb}), + p!(7, Online, fd! {dc: Spb}), + p!(8, Online, fd! {dc: Spb}), + p!(9, Online, fd! {dc: Spb}), + ], + &[1, 2, 3, 4, 5], + &[6, 7, 8, 9] + ), + // Existing voters should be redistributed according to failure domains + cc![AddLearnerNode(2), AddLearnerNode(3), AddNode(6), AddNode(7)] + ); + + assert_eq!( + cc( + &[ + p!(1, Online, fd! {x: _1, y: _5}), + p!(2, Online, fd! {x: _2, y: _2}), + p!(3, Online, fd! {x: _3, y: _3}), + p!(4, Online, fd! {x: _4, y: _4}), + p!(5, Online, fd! {x: _5, y: _5}), + p!(6, Online, fd! {x: _1, y: _2}), + ], + &[2, 3, 4, 5, 6], + &[1] + ), + // Voter should not be replaced if candidate has same distance to other voters + None + ); + + assert_eq!( + cc( + &[ + p!(1, Offline, fd! {dc: msk}), + p!(2, Offline, fd! {dc: msk}), + p!(3, Online, fd! {dc: spb}), + p!(4, Online, fd! {dc: spb}), + ], + &[1, 2, 3], + &[4] + ), + // Vouters number should not fall below the optimal number + cc![AddLearnerNode(1), AddNode(4)] + ); } } diff --git a/src/traft/mod.rs b/src/traft/mod.rs index fe2adb9c24212a473e33127ebf1f925ebb0e1755..5affa193c9a08b2f761cf74a0516e3fa59af5252 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -33,6 +33,7 @@ pub type RaftId = u64; pub type RaftTerm = u64; pub type RaftIndex = u64; pub type Address = String; +pub type Distance = u64; pub const INIT_RAFT_TERM: RaftTerm = 1;