Newer
Older
use crate::traft::{Distance, RaftId};
use ::raft::prelude as raft;
use ::raft::prelude::ConfChangeType::*;
use std::collections::HashMap;
// TODO: do not use BTreeSet, it's very bad for performance.
// Instead we should use a sorted array. All operations will be faster because
// of cache locality. Inserting into/removing from a sorted array is simple:
// find the location using binary search and shift the tail of the array.
// For our purposes this is going to be much more performant that the binary tree,
// because there will be dramatically fewer memory allocations and cache misses.
use std::collections::{BTreeMap, BTreeSet};
struct RaftConf<'a> {
voters: BTreeSet<RaftId>,
learners: BTreeSet<RaftId>,
}
impl<'a> RaftConf<'a> {
fn change_single(
&mut self,
change_type: raft::ConfChangeType,
node_id: RaftId,
) -> raft::ConfChangeSingle {
// Find the reference at
// https://github.com/tikv/raft-rs/blob/v0.6.0/src/confchange/changer.rs#L162
match change_type {
AddNode => {
self.voters.insert(node_id);
self.learners.remove(&node_id);
}
AddLearnerNode => {
self.voters.remove(&node_id);
self.learners.insert(node_id);
}
RemoveNode => {
self.voters.remove(&node_id);
self.learners.remove(&node_id);
}
}
raft::ConfChangeSingle {
change_type,
node_id,
..Default::default()
}
/// Sum of distances between failure domains of `a` and each of `bs`.
fn sum_distance(all: &BTreeMap<RaftId, &Instance>, bs: &BTreeSet<RaftId>, a: &RaftId) -> Distance {
let Some(a) = all.get(a) else { return 0 };
bs.iter()
.filter_map(|raft_id| all.get(raft_id))
.map(|b| b.failure_domain.distance(&a.failure_domain))
.sum()
}
/// Among `candidates` finds one with maximum total distance to `voters`.
///
/// Returns its `RaftId` and the calculated distance.
/// Returns `None` if `candidates` set is empty.
fn find_farthest(
all: &BTreeMap<RaftId, &Instance>,
voters: &BTreeSet<RaftId>,
candidates: &BTreeSet<RaftId>,
) -> Option<(RaftId, Distance)> {
candidates
.iter()
.map(|&raft_id| (raft_id, sum_distance(all, voters, &raft_id)))
.reduce(|acc, item| if item.1 > acc.1 { item } else { acc })
}
pub(crate) fn raft_conf_change(
voters: &[RaftId],
learners: &[RaftId],
) -> Option<raft::ConfChangeV2> {
let mut raft_conf = RaftConf {
all: instances.iter().map(|p| (p.raft_id, p)).collect(),
voters: voters.iter().cloned().collect(),
learners: learners.iter().cloned().collect(),
};
let mut changes: Vec<raft::ConfChangeSingle> = vec![];
// Only an instance from tier with `can_vote = true` can be considered as voter.
let tier_can_vote = |instance: &&Instance| {
tiers
.get(instance.tier.as_str())
.expect("tier for instance should exists")
.can_vote
};
let number_of_eligible_for_vote = instances
.iter()
.filter(|instance| has_states!(instance, * -> not Expelled))
.filter(tier_can_vote)
.count();
let voters_needed = match number_of_eligible_for_vote {
5.. => 5,
3..=4 => 3,
x => x,
};
// A list of instances that can be safely promoted to voters.
let mut promotable: BTreeSet<RaftId> = instances
.iter()
// Only an instance with current state online can be promoted
.filter(|instance| has_states!(instance, Online -> Online))
.map(|instance| instance.raft_id)
// Exclude those who is already a voter.
.filter(|raft_id| !raft_conf.voters.contains(raft_id))
.collect();
let next_farthest = |raft_conf: &RaftConf, promotable: &BTreeSet<RaftId>| {
find_farthest(&raft_conf.all, &raft_conf.voters, promotable)
};
// Remove / replace voters
for voter_id in raft_conf.voters.clone().iter() {
let Some(instance) = raft_conf.all.get(voter_id) else {
// unknown instance which is not in configuration of cluster, nearly impossible
#[rustfmt::skip]
tlog!(Warning, "no info found for instance which is a raft voter: raft_id = {voter_id}");
let ccs = raft_conf.change_single(RemoveNode, *voter_id);
changes.push(ccs);
continue;
if !tier_can_vote(instance) {
// case when bootstrap leader from tier with `can_vote = false`
#[rustfmt::skip]
tlog!(Warning, "instance with name '{}' from tier '{}' with 'can_vote' = false is in raft configuration as voter, making it follower",
instance.name, instance.tier);
let ccs = raft_conf.change_single(RemoveNode, *voter_id);
changes.push(ccs);
continue;
}
if has_states!(instance, Expelled -> *) {
// Instance was already expelled, remove it entirely.
let ccs = raft_conf.change_single(RemoveNode, instance.raft_id);
changes.push(ccs);
continue;
}
if has_states!(instance, * -> Offline) || has_states!(instance, * -> Expelled) {
// A voter is shutting down or getting expelled.
// First try to replace it with another online instance.
if let Some((next_voter_id, _)) = next_farthest(&raft_conf, &promotable) {
let ccs1 = raft_conf.change_single(AddLearnerNode, instance.raft_id);
let ccs2 = raft_conf.change_single(AddNode, next_voter_id);
changes.extend_from_slice(&[ccs1, ccs2]);
promotable.remove(&next_voter_id);
if has_states!(instance, * -> Expelled) {
// If no replacement found, but instance is getting expelled, demote it anyway
let ccs = raft_conf.change_single(AddLearnerNode, instance.raft_id);
changes.push(ccs);
continue;
}
let have_excess_voters = raft_conf.voters.len() > voters_needed;
if have_excess_voters && has_states!(instance, * -> Offline) {
// If no replacement found, but there's already an excess in
// voters, the Offline instance should be the one being demoted
let ccs = raft_conf.change_single(AddLearnerNode, instance.raft_id);
changes.push(ccs);
continue;
}
for voter_id in raft_conf.voters.clone().iter().skip(voters_needed) {
// If threre're more voters that needed, remove excess ones.
// That may be the case when one of 5 instances is expelled.
let ccs = raft_conf.change_single(AddLearnerNode, *voter_id);
changes.push(ccs);
// Remove unknown / expelled learners
for learner_id in raft_conf.learners.clone().iter() {
let Some(instance) = raft_conf.all.get(learner_id) else {
// unknown instance which is not in configuration of cluster, nearly impossible
#[rustfmt::skip]
tlog!(Warning, "no info found for instance which is a raft voter: raft_id = {learner_id}");
let ccs = raft_conf.change_single(RemoveNode, *learner_id);
changes.push(ccs);
continue;
};
if has_states!(instance, Expelled -> *) {
// Instance was already expelled => remove it unconditionally.
let ccs = raft_conf.change_single(RemoveNode, instance.raft_id);
changes.push(ccs);
// Promote more voters
while raft_conf.voters.len() < voters_needed {
let Some((new_voter_id, _)) = next_farthest(&raft_conf, &promotable) else {
break;
};
let ccs = raft_conf.change_single(AddNode, new_voter_id);
changes.push(ccs);
promotable.remove(&new_voter_id);
// Redistribute existing voters according to their failure domains
for voter_id in raft_conf.voters.clone().iter() {
let mut other_voters = raft_conf.voters.clone();
other_voters.remove(voter_id);
let other_voters = other_voters;
let Some((new_voter_id, new_distance)) =
find_farthest(&raft_conf.all, &other_voters, &promotable)
else {
break;
};
if new_distance > sum_distance(&raft_conf.all, &other_voters, voter_id) {
let ccs1 = raft_conf.change_single(AddLearnerNode, *voter_id);
let ccs2 = raft_conf.change_single(AddNode, new_voter_id);
changes.extend_from_slice(&[ccs1, ccs2]);
promotable.remove(&new_voter_id);
}
}
// Promote remaining instances as learners
if has_states!(instance, Expelled -> *)
|| raft_conf.voters.contains(&instance.raft_id)
|| raft_conf.learners.contains(&instance.raft_id)
// Non-expelled instance which is neither a learner nor a voter.
// It should become a learner.
let ccs = raft_conf.change_single(AddLearnerNode, instance.raft_id);
changes.push(ccs);
}
if changes.is_empty() {
return None;
}
let conf_change = raft::ConfChangeV2 {
transition: raft::ConfChangeTransition::Auto,
changes: changes.into(),
..Default::default()
};
Some(conf_change)
}
#[cfg(test)]
mod tests {
use crate::failure_domain::FailureDomain;
use crate::instance::StateVariant::*;
use ::raft::prelude as raft;
use std::collections::{BTreeMap, BTreeSet};
macro_rules! fd {
($(,)?) => { FailureDomain::default() };
($($k:tt : $v:tt),+ $(,)?) => {
FailureDomain::from([$((stringify!($k), stringify!($v))),+])
}
}
macro_rules! p {
(
$raft_id:literal,
$current_state:ident -> $target_state:ident
current_state: State {
variant: $current_state,

Georgy Moshkin
committed
// raft_conf_change doesn't care about incarnations
incarnation: 0,
},
target_state: State {
variant: $target_state,

Georgy Moshkin
committed
// raft_conf_change doesn't care about incarnations
incarnation: 0,
},
tier: crate::tier::DEFAULT_TIER.into(),
$(failure_domain: $failure_domain,)?
}
};
(
$raft_id:literal,
p!($raft_id, $state -> $state $(, $failure_domain)?)
macro_rules! p_with_tier {
(
$raft_id:literal,
$tier:ident,
let mut instance = p!($raft_id, $state -> $state);
instance.tier = $tier.to_string();
instance
}}
}
macro_rules! cc {
[$(
$change:ident($raft_id:literal)
),*] => {{
Some(raft::ConfChangeV2 {
changes: vec![$(
raft::ConfChangeSingle {
change_type: raft::ConfChangeType::$change,
node_id: $raft_id,
..Default::default()
}
),*].into(),
transition: raft::ConfChangeTransition::Auto,
..Default::default()
})
}};
}
fn cc(
all_instances: &[Instance],
voter_ids: &[RaftId],
learner_ids: &[RaftId],
) -> Option<raft::ConfChangeV2> {
let mut all_tiers = HashMap::new();
let default_tier = Tier::default();
all_tiers.insert(default_tier.name.as_str(), &default_tier);
cc_with_tiers(all_instances, voter_ids, learner_ids, &all_tiers)
all_instances: &[Instance],
voter_ids: &[RaftId],
learner_ids: &[RaftId],
all_tiers: &HashMap<&str, &Tier>,
) -> Option<raft::ConfChangeV2> {
let mut cc = super::raft_conf_change(all_instances, voter_ids, learner_ids, all_tiers)?;
cc.changes.sort_by(|l, r| Ord::cmp(&l.node_id, &r.node_id));
Some(cc)
}
fn test_sum_distance() {
assert_eq!(
3,
sum_distance(
&BTreeMap::from([
(1, &p!(1, Online, fd! {x: A, y: B})),
(2, &p!(2, Online, fd! {x: A, y: C})),
(3, &p!(3, Online, fd! {x: D, y: C}))
]),
&BTreeSet::from([2, 3]),
&1,
)
);
assert_eq!(
0,
sum_distance(
&BTreeMap::from([(1, &p!(1, Online, fd! {x: A, y: B}))]),
&BTreeSet::new(),
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
&1,
)
);
}
#[test]
fn test_find_farthest() {
let instances = [
p!(1, Online, fd! {dc: Msk, srv: Msk1}),
p!(2, Online, fd! {dc: Msk, srv: Msk2}),
p!(3, Online, fd! {dc: Msk, srv: Msk3}),
p!(4, Online, fd! {dc: Spb, srv: Spb1}),
p!(5, Online, fd! {dc: Spb, srv: Spb2}),
p!(6, Online, fd! {dc: Spb, srv: Spb3}),
p!(7, Online, fd! {dc: Arb, srv: Arb1}),
];
let map: BTreeMap<RaftId, &Instance> = instances.iter().map(|p| (p.raft_id, p)).collect();
let (found_raft_id, found_distance) = find_farthest(
&map,
&BTreeSet::from([7, 1]),
&BTreeSet::from([2, 3, 4, 5, 6]),
)
.unwrap();
let expected = [4, 5, 6];
assert!(BTreeSet::<RaftId>::from(expected).contains(&found_raft_id));
assert_eq!(4, found_distance);
assert_eq!(
None,
find_farthest(&map, &BTreeSet::from([7, 1]), &BTreeSet::from([]))
);
}
#[test]
fn test_conf_change() {
let p1 = || p!(1, Online);
let p2 = || p!(2, Online);
let p3 = || p!(3, Online);
let p4 = || p!(4, Online);
let p5 = || p!(5, Online);
assert_eq!(
cc(&[p1(), p!(2, Offline)], &[1], &[]),
cc![AddLearnerNode(2)]
cc(&[p1(), p!(2, Offline -> Online)], &[1], &[]),
cc![AddLearnerNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Offline -> Online)], &[1], &[2]),
// nothing to do until p2 attains current_state online
None
);
assert_eq!(
cc(&[p1(), p!(2, Online)], &[1], &[2]),
// promote p2 as soon as it attains current_state online
cc![AddNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Offline)], &[1, 2], &[]),
// don't reduce total voters number
None
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Offline)], &[1], &[2]),
// p2 went offline even before being promoted.
None
);
assert_eq!(
cc(&[p1(), p2(), p3(), p4()], &[1, 2, 3], &[4]),
// 4 instances -> 3 voters
None
);
assert_eq!(
cc(&[p1(), p2(), p3(), p4(), p5()], &[1, 2, 3], &[4, 5]),
// 5 and more instances -> 5 voters
cc![AddNode(4), AddNode(5)]
);
assert_eq!(
cc(
&[p1(), p2(), p!(3, Online -> Offline), p4()],
&[1, 2, 3],
&[4]
),
// failover a voter
cc![AddLearnerNode(3), AddNode(4)]
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
);
assert_eq!(
cc(
&[p1(), p2(), p3(), p4(), p5(), p!(6, Online -> Offline)],
&[1, 2, 3, 4, 5],
&[6]
),
None
);
assert_eq!(
cc(
&[
p!(1, Online -> Offline),
p!(2, Online -> Offline),
p!(3, Online -> Offline),
p!(4, Online -> Offline),
p!(5, Online -> Offline)
],
&[1, 2, 3, 4, 5],
&[]
),
None
);
assert_eq!(
cc(&[p1()], &[1, 99], &[]),
// Unknown voters should be removed
cc![RemoveNode(99)]
);
assert_eq!(
cc(&[p1()], &[1], &[99]),
// Unknown learners are removed as well
cc![RemoveNode(99)]
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Expelled)], &[1, 2], &[]),
// If a voters starts getting expelled, it becomes a learner
cc![AddLearnerNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Expelled)], &[1], &[2]),
// If a learner starts getting expelled, nothing happens
None
);
assert_eq!(
cc(&[p1(), p!(2, Expelled -> Expelled)], &[1, 2], &[]),
// Expelled voters should be removed
cc(&[p1(), p!(2, Expelled -> Expelled)], &[1], &[2]),
// Expelled learners are removed too
&[p1(), p2(), p3(), p4(), p!(5, Expelled -> Expelled)],
&[1, 2, 3, 4, 5],
&[]
),
// Tricky case.
// When one of five voters is expelled,
// only 3 voters should remain there.
cc![AddLearnerNode(4), RemoveNode(5)]
);
assert_eq!(
&[p1(), p!(2, Online -> Offline), p!(3, Offline -> Online)],
&[1, 2],
&[3]
),
// Tricky case.
// Voter p2 goes offline, but there's no replacement.
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
assert_eq!(
cc(
&[
p!(1, Online, fd! {dc: Arb}),
p!(2, Online, fd! {dc: Msk}),
p!(3, Online, fd! {dc: Msk}),
p!(4, Online, fd! {dc: Msk}),
p!(5, Online, fd! {dc: Spb}),
p!(6, Offline, fd! {dc: Spb}),
p!(7, Online, fd! {dc: Spb}),
],
&[1, 2, 3, 5],
&[4, 7]
),
// New voter should respect failure domain
cc![AddLearnerNode(6), AddNode(7)]
);
assert_eq!(
cc(
&[
p!(1, Online, fd! {dc: Arb}),
p!(2, Online, fd! {dc: Msk}),
p!(3, Online, fd! {dc: Msk}),
p!(4, Online, fd! {dc: Msk}),
p!(5, Online, fd! {dc: Msk}),
p!(6, Online, fd! {dc: Spb}),
p!(7, Online, fd! {dc: Spb}),
p!(8, Online, fd! {dc: Spb}),
p!(9, Online, fd! {dc: Spb}),
],
&[1, 2, 3, 4, 5],
&[6, 7, 8, 9]
),
// Existing voters should be redistributed according to failure domains
cc![AddLearnerNode(2), AddLearnerNode(3), AddNode(6), AddNode(7)]
);
assert_eq!(
cc(
&[
p!(1, Online, fd! {x: _1, y: _5}),
p!(2, Online, fd! {x: _2, y: _2}),
p!(3, Online, fd! {x: _3, y: _3}),
p!(4, Online, fd! {x: _4, y: _4}),
p!(5, Online, fd! {x: _5, y: _5}),
p!(6, Online, fd! {x: _1, y: _2}),
],
&[2, 3, 4, 5, 6],
&[1]
),
// Voter should not be replaced if candidate has same distance to other voters
None
);
assert_eq!(
cc(
&[
p!(1, Offline, fd! {dc: msk}),
p!(2, Offline, fd! {dc: msk}),
p!(3, Online, fd! {dc: spb}),
p!(4, Online, fd! {dc: spb}),
],
&[1, 2, 3],
&[4]
),
// Vouters number should not fall below the optimal number
cc![AddLearnerNode(1), AddNode(4)]
);
assert_eq!(
cc(
&[
p!(1, Online),
p!(2, Online -> Expelled),
p!(3, Online -> Offline),
p!(4, Online),
p!(5, Online),
],
&[1, 2, 3, 4, 5],
&[]
),
// One instance is getting expelled, now total voter count is 3, and
// we demote the 2 degenerate instances
cc![AddLearnerNode(2), AddLearnerNode(3)]
);
let mut tiers = HashMap::new();
const VOTER_TIER_NAME: &str = "voter";
const FOLLOWER_TIER_NAME: &str = "follower";
let voter_tier = Tier {
name: VOTER_TIER_NAME.into(),
replication_factor: 1,
can_vote: true,
};
let router_tier = Tier {
name: FOLLOWER_TIER_NAME.into(),
replication_factor: 1,
can_vote: false,
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
};
tiers.insert(voter_tier.name.as_str(), &voter_tier);
tiers.insert(router_tier.name.as_str(), &router_tier);
assert_eq!(
cc_with_tiers(
&[
p_with_tier!(1, VOTER_TIER_NAME, Online),
p_with_tier!(2, FOLLOWER_TIER_NAME, Online),
],
&[1],
&[2],
&tiers
),
// instance from router tier couldn't be upgraded to voter
None
);
assert_eq!(
cc_with_tiers(
&[
p_with_tier!(1, VOTER_TIER_NAME, Online),
p_with_tier!(2, FOLLOWER_TIER_NAME, Online),
],
&[2],
&[1],
&tiers
),
// case when bootstrap leader from tier with falsy `can_vote`
cc![AddNode(1), RemoveNode(2), AddLearnerNode(2)]
);
assert_eq!(
cc_with_tiers(
&[
p_with_tier!(1, VOTER_TIER_NAME, Expelled),
p_with_tier!(2, FOLLOWER_TIER_NAME, Online),
],
&[1],
&[2],
&tiers
),
// remove expelled voter
cc![RemoveNode(1)]
);
assert_eq!(
cc_with_tiers(
&[
p_with_tier!(1, FOLLOWER_TIER_NAME, Expelled),
p_with_tier!(2, FOLLOWER_TIER_NAME, Online),
],
&[1],
&[2],
&tiers
),
// remove expelled follower
cc![RemoveNode(1)]
);
assert_eq!(
cc_with_tiers(
&[
p_with_tier!(1, VOTER_TIER_NAME, Online),
p_with_tier!(2, VOTER_TIER_NAME, Online),
],
&[],
&[1, 2],
&tiers
),
cc![AddNode(1), AddNode(2)]
);