From f9d0e30d0d5028c3a03619670d2715aec8fabfc3 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Mon, 2 Sep 2024 17:48:30 +0300
Subject: [PATCH] fix: expelled instance used to not know it was expelled

---
 src/governor/conf_change.rs | 39 ++++++++++++++++++++++++++++---------
 src/on_shutdown.rs          |  5 +++++
 src/sentinel.rs             |  5 +++++
 test/conftest.py            |  7 +++++++
 test/int/test_expelling.py  | 12 ++++--------
 5 files changed, 51 insertions(+), 17 deletions(-)

diff --git a/src/governor/conf_change.rs b/src/governor/conf_change.rs
index 717c5708ed..da046dbc47 100644
--- a/src/governor/conf_change.rs
+++ b/src/governor/conf_change.rs
@@ -5,6 +5,12 @@ use crate::{has_states, tlog};
 use ::raft::prelude as raft;
 use ::raft::prelude::ConfChangeType::*;
 use std::collections::HashMap;
+// TODO: do not use BTreeSet, it's very bad for performance.
+// Instead we should use a sorted array. All operations will be faster because
+// of cache locality. Inserting into/removing from a sorted array is simple:
+// find the location using binary search and shift the tail of the array.
+// For our purposes this is going to be much more performant that the binary tree,
+// because there will be dramatically fewer memory allocations and cache misses.
 use std::collections::{BTreeMap, BTreeSet};
 
 struct RaftConf<'a> {
@@ -138,13 +144,16 @@ pub(crate) fn raft_conf_change(
             continue;
         }
 
-        if has_states!(instance, * -> Expelled) {
-            // Expelled instance is removed unconditionally.
+        if has_states!(instance, Expelled -> *) {
+            // Instance was already expelled => remove it unconditionally.
             let ccs = raft_conf.change_single(RemoveNode, instance.raft_id);
             changes.push(ccs);
-        } else if has_states!(instance, * -> Offline) {
-            // A voter goes offline. Replace it with
-            // another online instance if possible.
+            continue;
+        }
+
+        if has_states!(instance, * -> Offline) || has_states!(instance, * -> Expelled) {
+            // A voter is shutting down or getting expelled.
+            // Replace it with another online instance if possible.
             let Some((next_voter_id, _)) = next_farthest(&raft_conf, &promotable) else {
                 continue;
             };
@@ -174,7 +183,7 @@ pub(crate) fn raft_conf_change(
             continue;
         };
 
-        if has_states!(instance, * -> Expelled) {
+        if has_states!(instance, Expelled -> *) {
             // Instance was already expelled => remove it unconditionally.
             let ccs = raft_conf.change_single(RemoveNode, instance.raft_id);
             changes.push(ccs);
@@ -213,7 +222,7 @@ pub(crate) fn raft_conf_change(
 
     // Promote remaining instances as learners
     for instance in instances {
-        if has_states!(instance, * -> Expelled)
+        if has_states!(instance, Expelled -> *)
             || raft_conf.voters.contains(&instance.raft_id)
             || raft_conf.learners.contains(&instance.raft_id)
         {
@@ -492,19 +501,31 @@ mod tests {
 
         assert_eq!(
             cc(&[p1(), p!(2, Online -> Expelled)], &[1, 2], &[]),
+            // If a voters starts getting expelled, it becomes a learner
+            cc![AddLearnerNode(2)]
+        );
+
+        assert_eq!(
+            cc(&[p1(), p!(2, Online -> Expelled)], &[1], &[2]),
+            // If a learner starts getting expelled, nothing happens
+            None
+        );
+
+        assert_eq!(
+            cc(&[p1(), p!(2, Expelled -> Expelled)], &[1, 2], &[]),
             // Expelled voters should be removed
             cc![RemoveNode(2)]
         );
 
         assert_eq!(
-            cc(&[p1(), p!(2, Offline -> Expelled)], &[1], &[2]),
+            cc(&[p1(), p!(2, Expelled -> Expelled)], &[1], &[2]),
             // Expelled learners are removed too
             cc![RemoveNode(2)]
         );
 
         assert_eq!(
             cc(
-                &[p1(), p2(), p3(), p4(), p!(5, Online -> Expelled)],
+                &[p1(), p2(), p3(), p4(), p!(5, Expelled -> Expelled)],
                 &[1, 2, 3, 4, 5],
                 &[]
             ),
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index cb8ad558b5..3c0cd1ccb2 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -35,6 +35,11 @@ pub async fn callback() {
             }
         );
 
+        if has_states!(me, Expelled -> *) {
+            tlog!(Info, "instance has been expelled");
+            break;
+        }
+
         if has_states!(me, Offline -> *) {
             tlog!(Info, "graceful shutdown succeeded");
 
diff --git a/src/sentinel.rs b/src/sentinel.rs
index b01cd67b3c..2b1301387f 100644
--- a/src/sentinel.rs
+++ b/src/sentinel.rs
@@ -58,6 +58,11 @@ impl Loop {
                 return ControlFlow::Continue(());
             };
 
+            if has_states!(instance, * -> Expelled) {
+                tlog!(Debug, "instance has been expelled, sentinel out");
+                return ControlFlow::Break(());
+            }
+
             let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id)
                 .with_target_state(Offline);
 
diff --git a/test/conftest.py b/test/conftest.py
index 58669595a1..c87dea6944 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1232,6 +1232,13 @@ class Instance:
         else:
             raise ProcessDead(f"process exited unexpectedly, {exit_code=}")
 
+    def assert_process_dead(self):
+        try:
+            self.check_process_alive()
+            assert False
+        except ProcessDead:
+            pass
+
     def wait_online(
         self, timeout: int | float = 30, rps: int | float = 5, expected_incarnation=None
     ):
diff --git a/test/int/test_expelling.py b/test/int/test_expelling.py
index b246909cbe..7f02364c25 100644
--- a/test/int/test_expelling.py
+++ b/test/int/test_expelling.py
@@ -1,5 +1,5 @@
 import pytest
-from conftest import Cluster, Instance, Retriable, pid_alive
+from conftest import Cluster, Instance, Retriable
 
 
 @pytest.fixture
@@ -20,10 +20,6 @@ def assert_voters(voters: list[Instance], instance: Instance):
     assert sorted(actual_voters) == sorted(expected_voters)
 
 
-def assert_pid_down(pid):
-    assert not pid_alive(pid)
-
-
 def test_expel_follower(cluster3: Cluster):
     # Scenario: expel a Follower instance by command to Leader
     #   Given a cluster
@@ -42,7 +38,7 @@ def test_expel_follower(cluster3: Cluster):
     Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1))
 
     # assert i3.process
-    # Retriable(timeout=10).call(lambda: assert_pid_down(i3.process.pid))
+    Retriable(timeout=10).call(i3.assert_process_dead)
 
 
 def test_expel_leader(cluster3: Cluster):
@@ -63,7 +59,7 @@ def test_expel_leader(cluster3: Cluster):
     Retriable(timeout=10).call(lambda: assert_voters([i2, i3], i2))
 
     # assert i1.process
-    # Retriable(timeout=10).call(lambda: assert_pid_down(i1.process.pid))
+    Retriable(timeout=10).call(i1.assert_process_dead)
 
 
 def test_expel_by_follower(cluster3: Cluster):
@@ -84,7 +80,7 @@ def test_expel_by_follower(cluster3: Cluster):
     Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1))
 
     # assert i3.process
-    # Retriable(timeout=10).call(lambda: assert_pid_down(i3.process.pid))
+    Retriable(timeout=10).call(i3.assert_process_dead)
 
 
 def test_raft_id_after_expel(cluster: Cluster):
-- 
GitLab