Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/picodata
1 result
Show changes
Commits on Source (11)
[package]
name = "picodata"
version = "22.7.0"
edition = "2018"
edition = "2021"
autotests = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
......@@ -23,10 +23,13 @@ indoc = "1.0"
nix = "0.23.1"
itertools = "0.10.3"
base64 = "0.13"
protobuf = "2.27"
lazy_static = "1.4"
uuid = {version = "1.0", features = ["v3"]}
[dependencies.protobuf]
version = "2.27"
features = ["bytes"]
[dependencies.slog]
version = "2.7.0"
features = ["max_level_trace", "release_max_level_trace"]
......
......@@ -41,7 +41,7 @@ fn patch_tarantool() {
}
}
let _ = std::fs::File::create(&patch_check)
let _ = std::fs::File::create(patch_check)
.unwrap_or_else(|e| panic!("failed to create '{}': {}", patch_check.display(), e));
}
......
......@@ -20,7 +20,7 @@ RUN set -e; \
RUN set -e; \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \
sh -s -- -y --profile default --default-toolchain 1.64.0
sh -s -- -y --profile default --default-toolchain 1.65.0
ENV PATH=/root/.cargo/bin:${PATH}
COPY ci-log-section /usr/bin/ci-log-section
......
......@@ -267,15 +267,28 @@ fn picolib_setup(args: &args::Run) {
.unwrap();
}
#[derive(::tarantool::tlua::LuaRead, Default, Clone, Copy)]
enum Justify {
Left,
#[default]
Center,
Right,
}
#[derive(::tarantool::tlua::LuaRead)]
struct RaftLogOpts {
return_string: Option<bool>,
justify_contents: Option<Justify>,
}
luamod.set(
"raft_log",
tlua::function1(
|opts: Option<RaftLogOpts>| -> traft::Result<Option<String>> {
let return_string = opts.and_then(|o| o.return_string).unwrap_or(false);
let mut return_string = false;
let mut justify_contents = Default::default();
if let Some(opts) = opts {
return_string = opts.return_string.unwrap_or(false);
justify_contents = opts.justify_contents.unwrap_or_default();
}
let header = ["index", "term", "lc", "contents"];
let [index, term, lc, contents] = header;
let mut rows = vec![];
......@@ -318,40 +331,48 @@ fn picolib_setup(args: &args::Run) {
use std::io::Write;
let mut buf: Vec<u8> = Vec::with_capacity(512);
let write_contents = move |buf: &mut Vec<u8>, contents: &str| match justify_contents
{
Justify::Left => writeln!(buf, "{contents: <cw$}|"),
Justify::Center => writeln!(buf, "{contents: ^cw$}|"),
Justify::Right => writeln!(buf, "{contents: >cw$}|"),
};
let row_sep = |buf: &mut Vec<u8>| {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:-^lw$}+{0:-^cw$}+", "").unwrap()
match justify_contents {
Justify::Left => {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:-^lw$}+{0:-<cw$}+", "")
}
Justify::Center => {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:-^lw$}+{0:-^cw$}+", "")
}
Justify::Right => {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:-^lw$}+{0:->cw$}+", "")
}
}
.unwrap()
};
row_sep(&mut buf);
writeln!(
buf,
"|{index: ^iw$}|{term: ^tw$}|{lc: ^lw$}|{contents: ^cw$}|"
)
.unwrap();
write!(buf, "|{index: ^iw$}|{term: ^tw$}|{lc: ^lw$}|").unwrap();
write_contents(&mut buf, contents).unwrap();
row_sep(&mut buf);
for [index, term, lc, contents] in rows {
if contents.len() <= cw {
writeln!(
buf,
"|{index: ^iw$}|{term: ^tw$}|{lc: ^lw$}|{contents: ^cw$}|"
)
.unwrap();
write!(buf, "|{index: ^iw$}|{term: ^tw$}|{lc: ^lw$}|").unwrap();
write_contents(&mut buf, &contents).unwrap();
} else {
writeln!(
buf,
"|{index: ^iw$}|{term: ^tw$}|{lc: ^lw$}|{contents: ^cw$}|",
contents = &contents[..cw],
)
.unwrap();
write!(buf, "|{index: ^iw$}|{term: ^tw$}|{lc: ^lw$}|").unwrap();
write_contents(&mut buf, &contents[..cw]).unwrap();
let mut rest = &contents[cw..];
while !rest.is_empty() {
let clamped_cw = usize::min(rest.len(), cw);
writeln!(
write!(
buf,
"|{blank: ^iw$}|{blank: ^tw$}|{blank: ^lw$}|{contents: ^cw$}|",
"|{blank: ^iw$}|{blank: ^tw$}|{blank: ^lw$}|",
blank = "~",
contents = &rest[..clamped_cw],
)
.unwrap();
write_contents(&mut buf, &rest[..clamped_cw]).unwrap();
rest = &rest[clamped_cw..];
}
}
......@@ -913,7 +934,7 @@ fn postjoin(args: &args::Run, storage: Storage) {
tlog!(Debug, "Getting a read barrier...");
loop {
if node.status().leader_id == None {
if node.status().leader_id.is_none() {
// This check doesn't guarantee anything. It only eliminates
// unnecesary requests that will fail for sure. For example,
// re-election still may be abrupt while `node.read_index()`
......@@ -953,7 +974,7 @@ fn postjoin(args: &args::Run, storage: Storage) {
.with_target_grade(TargetGrade::Online)
.with_failure_domain(args.failure_domain());
let leader_id = node.status().leader_id.expect("leader_id deinitialized");
let leader_id = unwrap_some_or!(node.status().leader_id, continue);
let leader = storage.peers.get(&leader_id).unwrap();
// It's necessary to call `proc_update_peer` remotely on a
......
......@@ -99,39 +99,3 @@ pub fn on_shutdown() {
tlog!(Warning, "failed to wait for self demotion: {e}");
}
}
pub fn voters_needed(voters: usize, total: usize) -> i64 {
let voters_expected = match total {
0 => {
crate::warn_or_panic!("`voters_needed` was called with `total` = 0");
0
}
1 => 1,
2 => 2,
3..=4 => 3,
5.. => 5,
_ => unreachable!(
"just another thing rust is garbage at:
`5..` covers all the rest of the values,
but rust can't figure this out"
),
};
voters_expected - (voters as i64)
}
#[cfg(test)]
mod tests {
#[test]
fn voters_needed() {
assert_eq!(super::voters_needed(0, 1), 1);
assert_eq!(super::voters_needed(1, 1), 0);
assert_eq!(super::voters_needed(2, 1), -1);
assert_eq!(super::voters_needed(0, 2), 2);
assert_eq!(super::voters_needed(2, 3), 1);
assert_eq!(super::voters_needed(6, 4), -3);
assert_eq!(super::voters_needed(1, 5), 4);
assert_eq!(super::voters_needed(1, 999), 4);
assert_eq!(super::voters_needed(0, usize::MAX), 5);
assert_eq!(super::voters_needed(0, u64::MAX as _), 5);
}
}
// TODO
use ::raft::prelude as raft;
use ::raft::prelude::ConfChangeType::*;
use std::cmp::Ord;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use crate::traft::CurrentGrade;
use crate::traft::Peer;
use crate::traft::RaftId;
use crate::traft::TargetGrade;
use crate::unwrap_some_or;
struct RaftConf<'a> {
all: BTreeMap<RaftId, &'a Peer>,
voters: BTreeSet<RaftId>,
learners: BTreeSet<RaftId>,
}
impl<'a> RaftConf<'a> {
fn change_single(
&mut self,
change_type: raft::ConfChangeType,
node_id: RaftId,
) -> raft::ConfChangeSingle {
// Find the reference at
// https://github.com/tikv/raft-rs/blob/v0.6.0/src/confchange/changer.rs#L162
match change_type {
AddNode => {
self.voters.insert(node_id);
self.learners.remove(&node_id);
}
AddLearnerNode => {
self.voters.remove(&node_id);
self.learners.insert(node_id);
}
RemoveNode => {
self.voters.remove(&node_id);
self.learners.remove(&node_id);
}
}
raft::ConfChangeSingle {
change_type,
node_id,
..Default::default()
}
}
}
pub(crate) fn raft_conf_change(
peers: &[Peer],
voters: &[RaftId],
learners: &[RaftId],
) -> Option<raft::ConfChangeV2> {
let mut raft_conf = RaftConf {
all: peers.iter().map(|p| (p.raft_id, p)).collect(),
voters: voters.iter().cloned().collect(),
learners: learners.iter().cloned().collect(),
};
let mut changes: Vec<raft::ConfChangeSingle> = vec![];
let not_expelled = |peer: &&Peer| !peer.is_expelled();
let target_online = |peer: &&Peer| peer.target_grade == TargetGrade::Online;
let current_online = |peer: &&Peer| peer.current_grade == CurrentGrade::Online;
let cluster_size = peers.iter().filter(not_expelled).count();
let voters_needed = match cluster_size {
// five and more nodes -> 5 voters
5.. => 5,
// three or four nodes -> 3 voters
3..=4 => 3,
// two nodes -> 2 voters
// one node -> 1 voter
// zero nodes -> 0 voters (almost unreachable)
x => x,
};
// Remove / replace voters
for voter_id in raft_conf.voters.clone().iter() {
let peer = raft_conf.all.get(voter_id);
match peer {
#[rustfmt::skip]
Some(Peer {target_grade: TargetGrade::Online, ..}) => {
// Do nothing
}
#[rustfmt::skip]
Some(peer @ Peer {target_grade: TargetGrade::Offline, ..}) => {
// A voter goes offline. Replace it with
// another online instance if possible.
let replacement = peers.iter().find(|peer| {
peer.has_grades(CurrentGrade::Online, TargetGrade::Online)
&& !raft_conf.voters.contains(&peer.raft_id)
});
let replacement = unwrap_some_or!(replacement, continue);
let ccs1 = raft_conf.change_single(AddLearnerNode, peer.raft_id);
let ccs2 = raft_conf.change_single(AddNode, replacement.raft_id);
changes.extend_from_slice(&[ccs1, ccs2]);
}
#[rustfmt::skip]
Some(peer @ Peer {target_grade: TargetGrade::Expelled, ..}) => {
// Expelled instance is removed unconditionally.
let ccs = raft_conf.change_single(RemoveNode, peer.raft_id);
changes.push(ccs);
}
None => {
// Nearly impossible, but rust forces me to check it.
let ccs = raft_conf.change_single(RemoveNode, *voter_id);
changes.push(ccs);
}
}
}
for voter_id in raft_conf.voters.clone().iter().skip(voters_needed) {
// If threre're more voters that needed, remove excess ones.
// That may be the case when one of 5 instances is expelled.
let ccs = raft_conf.change_single(AddLearnerNode, *voter_id);
changes.push(ccs);
}
// Remove unknown / expelled learners
for learner_id in raft_conf.learners.clone().iter() {
let peer = raft_conf.all.get(learner_id);
match peer {
#[rustfmt::skip]
Some(Peer {target_grade: TargetGrade::Online, ..}) => {
// Do nothing
}
#[rustfmt::skip]
Some(Peer {target_grade: TargetGrade::Offline, ..}) => {
// Do nothing
}
#[rustfmt::skip]
Some(peer @ Peer {target_grade: TargetGrade::Expelled, ..}) => {
// Expelled instance is removed unconditionally.
let ccs = raft_conf.change_single(RemoveNode, peer.raft_id);
changes.push(ccs);
}
None => {
// Nearly impossible, but rust forces me to check it.
let ccs = raft_conf.change_single(RemoveNode, *learner_id);
changes.push(ccs);
}
}
}
// Promote more voters
for peer in peers.iter().filter(target_online).filter(current_online) {
if raft_conf.voters.len() >= voters_needed {
break;
}
if !raft_conf.voters.contains(&peer.raft_id) {
let ccs = raft_conf.change_single(AddNode, peer.raft_id);
changes.push(ccs);
}
}
// Promote remaining instances as learners
for peer in peers.iter().filter(not_expelled) {
if !raft_conf.voters.contains(&peer.raft_id) && !raft_conf.learners.contains(&peer.raft_id)
{
let ccs = raft_conf.change_single(AddLearnerNode, peer.raft_id);
changes.push(ccs);
}
}
if changes.is_empty() {
return None;
}
// for the sake of test stability
changes.sort_by(|l, r| Ord::cmp(&l.node_id, &r.node_id));
let conf_change = raft::ConfChangeV2 {
transition: raft::ConfChangeTransition::Auto,
changes: changes.into(),
..Default::default()
};
Some(conf_change)
}
#[cfg(test)]
mod tests {
use ::raft::prelude as raft;
use super::raft_conf_change as cc;
use crate::traft;
macro_rules! p {
(
$raft_id:literal,
$current_grade:ident ->
$target_grade:ident
) => {
traft::Peer {
raft_id: $raft_id,
current_grade: traft::CurrentGrade::$current_grade,
target_grade: traft::TargetGrade::$target_grade,
..traft::Peer::default()
}
};
(
$raft_id:literal,
$grade:ident
) => {
p!($raft_id, $grade -> $grade)
};
}
macro_rules! cc {
[$(
$change:ident($raft_id:literal)
),*] => {{
Some(raft::ConfChangeV2 {
changes: vec![$(
raft::ConfChangeSingle {
change_type: raft::ConfChangeType::$change,
node_id: $raft_id,
..Default::default()
}
),*].into(),
transition: raft::ConfChangeTransition::Auto,
..Default::default()
})
}};
}
#[test]
fn conf_change() {
let p1 = || p!(1, Online);
let p2 = || p!(2, Online);
let p3 = || p!(3, Online);
let p4 = || p!(4, Online);
let p5 = || p!(5, Online);
assert_eq!(
cc(&[p1(), p!(2, Offline)], &[1], &[]),
cc![AddLearnerNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Offline -> Online)], &[1], &[]),
cc![AddLearnerNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, RaftSynced -> Online)], &[1], &[2]),
// nothing to do until p2 attains current_grade online
None
);
assert_eq!(
cc(&[p1(), p!(2, Online)], &[1], &[2]),
// promote p2 as soon as it attains current_grade online
cc![AddNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Offline)], &[1, 2], &[]),
// don't reduce total voters number
None
);
assert_eq!(
cc(&[p1(), p!(2, Replicated -> Offline)], &[1], &[2]),
// p2 went offline even before being promoted.
None
);
assert_eq!(
cc(&[p1(), p2(), p3(), p4()], &[1, 2, 3], &[4]),
// 4 instances -> 3 voters
None
);
assert_eq!(
cc(&[p1(), p2(), p3(), p4(), p5()], &[1, 2, 3], &[4, 5]),
// 5 and more instances -> 5 voters
cc![AddNode(4), AddNode(5)]
);
assert_eq!(
cc(
&[p1(), p2(), p!(3, Online -> Offline), p4()],
&[1, 2, 3],
&[4]
),
// failover a voter
cc![AddLearnerNode(3), AddNode(4)]
);
assert_eq!(
cc(
&[p1(), p2(), p3(), p4(), p5(), p!(6, Online -> Offline)],
&[1, 2, 3, 4, 5],
&[6]
),
None
);
assert_eq!(
cc(
&[
p!(1, Online -> Offline),
p!(2, Online -> Offline),
p!(3, Online -> Offline),
p!(4, Online -> Offline),
p!(5, Online -> Offline)
],
&[1, 2, 3, 4, 5],
&[]
),
None
);
assert_eq!(
cc(&[p1()], &[1, 99], &[]),
// Unknown voters should be removed
cc![RemoveNode(99)]
);
assert_eq!(
cc(&[p1()], &[1], &[99]),
// Unknown learners are removed as well
cc![RemoveNode(99)]
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Expelled)], &[1, 2], &[]),
// Expelled voters should be removed
cc![RemoveNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Offline -> Expelled)], &[1], &[2]),
// Expelled learners are removed too
cc![RemoveNode(2)]
);
assert_eq!(
cc(
&[p1(), p2(), p3(), p4(), p!(5, Online -> Expelled)],
&[1, 2, 3, 4, 5],
&[]
),
// Tricky case.
// When one of five voters is expelled,
// only 3 voters should remain there.
cc![AddLearnerNode(4), RemoveNode(5)]
);
assert_eq!(
cc(
&[p1(), p!(2, Online -> Offline), p!(3, RaftSynced -> Online)],
&[1, 2],
&[3]
),
// Tricky case.
// Voter p2 goes offline, but there's no replacement.
None
);
}
}
......@@ -30,6 +30,7 @@ use tarantool::space::UpdateOps;
use crate::kvcell::KVCell;
use crate::r#loop::{FlowControl, Loop};
use crate::stringify_cfunc;
use crate::traft::governor::raft_conf_change;
use crate::traft::storage::ClusterSpace;
use crate::traft::ContextCoercion as _;
use crate::traft::OpDML;
......@@ -41,14 +42,12 @@ use crate::unwrap_some_or;
use crate::warn_or_panic;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use std::iter::FromIterator as _;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::failover;
use crate::traft::notify::Notify;
use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
use crate::traft::rpc::{replication, sharding, sync};
......@@ -292,7 +291,7 @@ impl Node {
#[inline]
fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
let mut node_impl = self.node_impl.lock();
let res = f(&mut *node_impl);
let res = f(&mut node_impl);
drop(node_impl);
self.main_loop.wakeup();
res
......@@ -911,99 +910,6 @@ impl Drop for MainLoop {
}
}
fn raft_conf_change(storage: &RaftSpaceAccess, peers: &[Peer]) -> Option<raft::ConfChangeV2> {
let voter_ids: HashSet<RaftId> =
HashSet::from_iter(storage.voters().unwrap().unwrap_or_default());
let learner_ids: HashSet<RaftId> =
HashSet::from_iter(storage.learners().unwrap().unwrap_or_default());
let peer_is_active: HashMap<RaftId, bool> = peers
.iter()
.map(|peer| (peer.raft_id, peer.is_online()))
.collect();
let (active_voters, to_demote): (Vec<RaftId>, Vec<RaftId>) = voter_ids
.iter()
.partition(|id| peer_is_active.get(id).copied().unwrap_or(false));
let active_learners: Vec<RaftId> = learner_ids
.iter()
.copied()
.filter(|id| peer_is_active.get(id).copied().unwrap_or(false))
.collect();
let new_peers: Vec<RaftId> = peer_is_active
.iter()
.map(|(&id, _)| id)
.filter(|id| !voter_ids.contains(id) && !learner_ids.contains(id))
.collect();
let mut changes: Vec<raft::ConfChangeSingle> = Vec::new();
const VOTER: bool = true;
const LEARNER: bool = false;
changes.extend(
to_demote
.into_iter()
.map(|id| conf_change_single(id, LEARNER)),
);
let total_active = active_voters.len() + active_learners.len() + new_peers.len();
if total_active == 0 {
return None;
}
let new_peers_to_promote;
match failover::voters_needed(active_voters.len(), total_active) {
0 => {
new_peers_to_promote = 0;
}
pos @ 1..=i64::MAX => {
let pos = pos as usize;
if pos < active_learners.len() {
for &raft_id in &active_learners[0..pos] {
changes.push(conf_change_single(raft_id, VOTER))
}
new_peers_to_promote = 0;
} else {
for &raft_id in &active_learners {
changes.push(conf_change_single(raft_id, VOTER))
}
new_peers_to_promote = pos - active_learners.len();
assert!(new_peers_to_promote <= new_peers.len());
for &raft_id in &new_peers[0..new_peers_to_promote] {
changes.push(conf_change_single(raft_id, VOTER))
}
}
}
neg @ i64::MIN..=-1 => {
let neg = -neg as usize;
assert!(neg < active_voters.len());
for &raft_id in &active_voters[0..neg] {
changes.push(conf_change_single(raft_id, LEARNER))
}
new_peers_to_promote = 0;
}
}
for &raft_id in &new_peers[new_peers_to_promote..] {
changes.push(conf_change_single(raft_id, LEARNER))
}
if changes.is_empty() {
return None;
}
let conf_change = raft::ConfChangeV2 {
transition: raft::ConfChangeTransition::Auto,
changes: changes.into(),
..Default::default()
};
Some(conf_change)
}
fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
let mut pool = ConnectionPool::builder(storage.peers.clone())
.call_timeout(Duration::from_secs(1))
......@@ -1029,7 +935,9 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
////////////////////////////////////////////////////////////////////////
// conf change
if let Some(conf_change) = raft_conf_change(&storage.raft, &peers) {
let voters = storage.raft.voters().unwrap().unwrap_or_default();
let learners = storage.raft.learners().unwrap().unwrap_or_default();
if let Some(conf_change) = raft_conf_change(&peers, &voters, &learners) {
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
// It also guarantees that the notification will arrive only
......@@ -1546,19 +1454,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
}
}
fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
let change_type = if is_voter {
raft::ConfChangeType::AddNode
} else {
raft::ConfChangeType::AddLearnerNode
};
raft::ConfChangeSingle {
change_type,
node_id,
..Default::default()
}
}
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
......
......@@ -425,7 +425,7 @@ impl std::fmt::Display for Uppercase {
impl std::borrow::Borrow<str> for Uppercase {
fn borrow(&self) -> &str {
&*self.0
&self.0
}
}
......
......@@ -209,8 +209,8 @@ def test_raft_log(instance: Instance):
raft_log = instance.call("picolib.raft_log", dict(return_string=True))
def strip_spaces(s: str):
s = re.sub(r"( +\| +)|(\| +)|( +\|)", "|", s)
s = re.sub(r"(\-+\+\-+)|(\+\-+)|(\-+\+)", "+", s)
s = re.sub(r"[ ]*\|[ ]*", "|", s)
s = re.sub(r"[-]*\+[-]*", "+", s)
return s
expected = """\
......
......@@ -105,90 +105,6 @@ def test_restart_both(cluster2: Cluster):
assert i2.eval("return check") is True
def test_deactivation(cluster2: Cluster):
i1, i2 = cluster2.instances
def is_voter_is_online(instance: Instance, raft_id):
return tuple(
instance.eval(
"""
raft_id = ...
current_grade = box.space.raft_group.index.raft_id:get(raft_id).current_grade
is_online = current_grade == 'Online'
voters = box.space.raft_state:get('voters').value
for _, voter in pairs(voters) do
if voter == raft_id then
return { true, is_online }
end
end
return { false, is_online }
""",
raft_id,
)
)
@funcy.retry(tries=20, timeout=0.3)
def assert_is_voter_is_online(instance: Instance, raft_id, is_voter, is_online):
assert is_voter_is_online(instance, raft_id) == (is_voter, is_online)
assert is_voter_is_online(i1, i1.raft_id) == (True, True)
assert is_voter_is_online(i2, i2.raft_id) == (True, True)
i2.terminate()
assert is_voter_is_online(i1, i1.raft_id) == (True, True)
assert is_voter_is_online(i1, i2.raft_id) == (False, False)
i2.start()
i2.wait_online()
assert is_voter_is_online(i1, i1.raft_id) == (True, True)
assert_is_voter_is_online(i2, i2.raft_id, True, True)
i1.terminate()
assert is_voter_is_online(i2, i1.raft_id) == (False, False)
assert is_voter_is_online(i2, i2.raft_id) == (True, True)
# wait until i2 is leader, so it has someone to send the deactivation
# request to
i2.promote_or_fail()
i2.terminate()
i1.start()
i2.start()
i1.wait_online()
i2.wait_online()
assert is_voter_is_online(i1, i1.raft_id) == (True, True)
assert is_voter_is_online(i2, i2.raft_id) == (True, True)
i1.terminate()
assert is_voter_is_online(i2, i1.raft_id) == (False, False)
assert is_voter_is_online(i2, i2.raft_id) == (True, True)
def proc_update_peer(
host: Instance, target: Instance, is_online: bool
) -> list[bool]:
current_grade = "Online" if is_online else "Offline"
return host.call(
".proc_update_peer",
target.instance_id,
target.cluster_id,
[{"CurrentGrade": current_grade}, {"TargetGrade": "Online"}],
)
# check idempotency
assert proc_update_peer(i2, target=i1, is_online=False) == ["Ok"]
assert proc_update_peer(i2, target=i1, is_online=False) == ["Ok"]
assert proc_update_peer(i2, target=i2, is_online=True) == ["Ok"]
assert proc_update_peer(i2, target=i2, is_online=True) == ["Ok"]
def test_gl119_panic_in_on_shutdown(cluster2: Cluster):
i1, i2 = cluster2.instances
......
......@@ -229,6 +229,13 @@ def test_cluster_id_mismatch(instance: Instance):
)
@pytest.mark.xfail(
run=False,
reason=(
"failed reading peer with id `3`: peer with id 3 not found, "
"thread 'main' panicked, src/traft/node.rs:1515:17"
),
)
def test_rebootstrap_follower(cluster3: Cluster):
# Scenario: rebootstrap a follower in a cluster of 3+
# Given a cluster of 3 instances
......@@ -305,35 +312,25 @@ def test_reconfigure_failure_domains(cluster: Cluster):
i1.assert_raft_status("Leader")
assert replicaset_id(i1) == "r1"
i2 = cluster.add_instance(
failure_domain=dict(planet="Mars"), init_replication_factor=2
)
i2 = cluster.add_instance(failure_domain=dict(planet="Mars"))
assert replicaset_id(i2) == "r1"
i2.terminate()
i1.terminate()
# fail to start without needed domain subdivisions
i1.failure_domain = dict(owner="Bob")
i1.fail_to_start()
i2.failure_domain = dict(owner="Bob")
i2.fail_to_start()
i1.failure_domain = dict(planet="Mars", owner="Bob")
i1.start()
i1.wait_online()
# replicaset doesn't change automatically
assert replicaset_id(i1) == "r1"
i2.failure_domain = dict(planet="Earth", owner="Jon")
i2.terminate()
i2.failure_domain = dict(planet="Earth", owner="Bob")
i2.start()
i2.wait_online()
# replicaset doesn't change automatically
assert replicaset_id(i2) == "r1"
i2.terminate()
i1.terminate()
# fail to remove domain subdivision
i1.failure_domain = dict(planet="Mars")
i1.fail_to_start()
i2.failure_domain = dict(planet="Mars")
i2.fail_to_start()
def test_fail_to_join(cluster: Cluster):
......