Skip to content
Snippets Groups Projects
Commit f9d0e30d authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: expelled instance used to not know it was expelled

parent 8fea6a46
No related branches found
No related tags found
1 merge request!1259Gmoshkin/fix expel
...@@ -5,6 +5,12 @@ use crate::{has_states, tlog}; ...@@ -5,6 +5,12 @@ use crate::{has_states, tlog};
use ::raft::prelude as raft; use ::raft::prelude as raft;
use ::raft::prelude::ConfChangeType::*; use ::raft::prelude::ConfChangeType::*;
use std::collections::HashMap; use std::collections::HashMap;
// TODO: do not use BTreeSet, it's very bad for performance.
// Instead we should use a sorted array. All operations will be faster because
// of cache locality. Inserting into/removing from a sorted array is simple:
// find the location using binary search and shift the tail of the array.
// For our purposes this is going to be much more performant that the binary tree,
// because there will be dramatically fewer memory allocations and cache misses.
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
struct RaftConf<'a> { struct RaftConf<'a> {
...@@ -138,13 +144,16 @@ pub(crate) fn raft_conf_change( ...@@ -138,13 +144,16 @@ pub(crate) fn raft_conf_change(
continue; continue;
} }
if has_states!(instance, * -> Expelled) { if has_states!(instance, Expelled -> *) {
// Expelled instance is removed unconditionally. // Instance was already expelled => remove it unconditionally.
let ccs = raft_conf.change_single(RemoveNode, instance.raft_id); let ccs = raft_conf.change_single(RemoveNode, instance.raft_id);
changes.push(ccs); changes.push(ccs);
} else if has_states!(instance, * -> Offline) { continue;
// A voter goes offline. Replace it with }
// another online instance if possible.
if has_states!(instance, * -> Offline) || has_states!(instance, * -> Expelled) {
// A voter is shutting down or getting expelled.
// Replace it with another online instance if possible.
let Some((next_voter_id, _)) = next_farthest(&raft_conf, &promotable) else { let Some((next_voter_id, _)) = next_farthest(&raft_conf, &promotable) else {
continue; continue;
}; };
...@@ -174,7 +183,7 @@ pub(crate) fn raft_conf_change( ...@@ -174,7 +183,7 @@ pub(crate) fn raft_conf_change(
continue; continue;
}; };
if has_states!(instance, * -> Expelled) { if has_states!(instance, Expelled -> *) {
// Instance was already expelled => remove it unconditionally. // Instance was already expelled => remove it unconditionally.
let ccs = raft_conf.change_single(RemoveNode, instance.raft_id); let ccs = raft_conf.change_single(RemoveNode, instance.raft_id);
changes.push(ccs); changes.push(ccs);
...@@ -213,7 +222,7 @@ pub(crate) fn raft_conf_change( ...@@ -213,7 +222,7 @@ pub(crate) fn raft_conf_change(
// Promote remaining instances as learners // Promote remaining instances as learners
for instance in instances { for instance in instances {
if has_states!(instance, * -> Expelled) if has_states!(instance, Expelled -> *)
|| raft_conf.voters.contains(&instance.raft_id) || raft_conf.voters.contains(&instance.raft_id)
|| raft_conf.learners.contains(&instance.raft_id) || raft_conf.learners.contains(&instance.raft_id)
{ {
...@@ -492,19 +501,31 @@ mod tests { ...@@ -492,19 +501,31 @@ mod tests {
assert_eq!( assert_eq!(
cc(&[p1(), p!(2, Online -> Expelled)], &[1, 2], &[]), cc(&[p1(), p!(2, Online -> Expelled)], &[1, 2], &[]),
// If a voters starts getting expelled, it becomes a learner
cc![AddLearnerNode(2)]
);
assert_eq!(
cc(&[p1(), p!(2, Online -> Expelled)], &[1], &[2]),
// If a learner starts getting expelled, nothing happens
None
);
assert_eq!(
cc(&[p1(), p!(2, Expelled -> Expelled)], &[1, 2], &[]),
// Expelled voters should be removed // Expelled voters should be removed
cc![RemoveNode(2)] cc![RemoveNode(2)]
); );
assert_eq!( assert_eq!(
cc(&[p1(), p!(2, Offline -> Expelled)], &[1], &[2]), cc(&[p1(), p!(2, Expelled -> Expelled)], &[1], &[2]),
// Expelled learners are removed too // Expelled learners are removed too
cc![RemoveNode(2)] cc![RemoveNode(2)]
); );
assert_eq!( assert_eq!(
cc( cc(
&[p1(), p2(), p3(), p4(), p!(5, Online -> Expelled)], &[p1(), p2(), p3(), p4(), p!(5, Expelled -> Expelled)],
&[1, 2, 3, 4, 5], &[1, 2, 3, 4, 5],
&[] &[]
), ),
......
...@@ -35,6 +35,11 @@ pub async fn callback() { ...@@ -35,6 +35,11 @@ pub async fn callback() {
} }
); );
if has_states!(me, Expelled -> *) {
tlog!(Info, "instance has been expelled");
break;
}
if has_states!(me, Offline -> *) { if has_states!(me, Offline -> *) {
tlog!(Info, "graceful shutdown succeeded"); tlog!(Info, "graceful shutdown succeeded");
......
...@@ -58,6 +58,11 @@ impl Loop { ...@@ -58,6 +58,11 @@ impl Loop {
return ControlFlow::Continue(()); return ControlFlow::Continue(());
}; };
if has_states!(instance, * -> Expelled) {
tlog!(Debug, "instance has been expelled, sentinel out");
return ControlFlow::Break(());
}
let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id) let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id)
.with_target_state(Offline); .with_target_state(Offline);
......
...@@ -1232,6 +1232,13 @@ class Instance: ...@@ -1232,6 +1232,13 @@ class Instance:
else: else:
raise ProcessDead(f"process exited unexpectedly, {exit_code=}") raise ProcessDead(f"process exited unexpectedly, {exit_code=}")
def assert_process_dead(self):
try:
self.check_process_alive()
assert False
except ProcessDead:
pass
def wait_online( def wait_online(
self, timeout: int | float = 30, rps: int | float = 5, expected_incarnation=None self, timeout: int | float = 30, rps: int | float = 5, expected_incarnation=None
): ):
......
import pytest import pytest
from conftest import Cluster, Instance, Retriable, pid_alive from conftest import Cluster, Instance, Retriable
@pytest.fixture @pytest.fixture
...@@ -20,10 +20,6 @@ def assert_voters(voters: list[Instance], instance: Instance): ...@@ -20,10 +20,6 @@ def assert_voters(voters: list[Instance], instance: Instance):
assert sorted(actual_voters) == sorted(expected_voters) assert sorted(actual_voters) == sorted(expected_voters)
def assert_pid_down(pid):
assert not pid_alive(pid)
def test_expel_follower(cluster3: Cluster): def test_expel_follower(cluster3: Cluster):
# Scenario: expel a Follower instance by command to Leader # Scenario: expel a Follower instance by command to Leader
# Given a cluster # Given a cluster
...@@ -42,7 +38,7 @@ def test_expel_follower(cluster3: Cluster): ...@@ -42,7 +38,7 @@ def test_expel_follower(cluster3: Cluster):
Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1)) Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1))
# assert i3.process # assert i3.process
# Retriable(timeout=10).call(lambda: assert_pid_down(i3.process.pid)) Retriable(timeout=10).call(i3.assert_process_dead)
def test_expel_leader(cluster3: Cluster): def test_expel_leader(cluster3: Cluster):
...@@ -63,7 +59,7 @@ def test_expel_leader(cluster3: Cluster): ...@@ -63,7 +59,7 @@ def test_expel_leader(cluster3: Cluster):
Retriable(timeout=10).call(lambda: assert_voters([i2, i3], i2)) Retriable(timeout=10).call(lambda: assert_voters([i2, i3], i2))
# assert i1.process # assert i1.process
# Retriable(timeout=10).call(lambda: assert_pid_down(i1.process.pid)) Retriable(timeout=10).call(i1.assert_process_dead)
def test_expel_by_follower(cluster3: Cluster): def test_expel_by_follower(cluster3: Cluster):
...@@ -84,7 +80,7 @@ def test_expel_by_follower(cluster3: Cluster): ...@@ -84,7 +80,7 @@ def test_expel_by_follower(cluster3: Cluster):
Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1)) Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1))
# assert i3.process # assert i3.process
# Retriable(timeout=10).call(lambda: assert_pid_down(i3.process.pid)) Retriable(timeout=10).call(i3.assert_process_dead)
def test_raft_id_after_expel(cluster: Cluster): def test_raft_id_after_expel(cluster: Cluster):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment