From ca20f2c9ab3e2e28904227486023a89abf2f78a3 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 18 Nov 2022 11:42:34 +0300
Subject: [PATCH] fix(expel): set current grade in governor

---
 src/traft/node.rs          | 36 +++++++++++++++++++++++++++++++++---
 src/traft/rpc/expel.rs     |  4 +---
 src/traft/topology.rs      | 12 +++++++++++-
 test/int/test_expelling.py |  8 +++-----
 4 files changed, 48 insertions(+), 12 deletions(-)

diff --git a/src/traft/node.rs b/src/traft/node.rs
index 6f31fd0081..fe2197cf17 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -284,6 +284,14 @@ impl Node {
         notify.recv()
     }
 
+    /// Attempt to transfer leadership to a given node and yield.
+    ///
+    /// **This function yields**
+    pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) {
+        self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id));
+        fiber::reschedule();
+    }
+
     /// This function **may yield** if `self.node_impl` mutex is acquired.
     #[inline]
     fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
@@ -955,12 +963,17 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
         }
 
         ////////////////////////////////////////////////////////////////////////
-        // offline
+        // offline/expel
         let to_offline = peers
             .iter()
             .filter(|peer| peer.current_grade != CurrentGradeVariant::Offline)
             // TODO: process them all, not just the first one
-            .find(|peer| peer.target_grade == TargetGradeVariant::Offline);
+            .find(|peer| {
+                let (target, current) = (peer.target_grade.variant, peer.current_grade.variant);
+                matches!(target, TargetGradeVariant::Offline)
+                    || !matches!(current, CurrentGradeVariant::Expelled)
+                        && matches!(target, TargetGradeVariant::Expelled)
+            });
         if let Some(peer) = to_offline {
             tlog!(
                 Info,
@@ -970,6 +983,23 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                 peer.target_grade
             );
 
+            // transfer leadership, if we're the one who goes offline
+            if peer.raft_id == node.raft_id {
+                if let Some(new_leader) = maybe_responding(&peers).find(|peer| {
+                    // FIXME: linear search
+                    voters.contains(&peer.raft_id)
+                }) {
+                    tlog!(
+                        Info,
+                        "transferring leadership to {}",
+                        new_leader.instance_id
+                    );
+                    node.transfer_leadership_and_yield(new_leader.raft_id);
+                    event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
+                    continue 'governor;
+                }
+            }
+
             let replicaset_id = &peer.replicaset_id;
             // choose a new replicaset master if needed
             let res = (|| -> traft::Result<_> {
@@ -1040,7 +1070,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
 
             // update peer's CurrentGrade
             let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id.clone())
-                .with_current_grade(CurrentGrade::offline(peer.target_grade.incarnation));
+                .with_current_grade(peer.target_grade.into());
             tlog!(Info,
                 "handling UpdatePeerRequest";
                 "current_grade" => %req.current_grade.expect("just set"),
diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs
index 2698e3dd95..d4018a70d5 100644
--- a/src/traft/rpc/expel.rs
+++ b/src/traft/rpc/expel.rs
@@ -23,9 +23,7 @@ crate::define_rpc_request! {
         }
 
         let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id)
-            .with_target_grade(traft::TargetGradeVariant::Expelled)
-            // TODO: only change target grade
-            .with_current_grade(traft::CurrentGrade::expelled(0));
+            .with_target_grade(traft::TargetGradeVariant::Expelled);
         node.handle_topology_request_and_wait(req2.into())?;
 
         Ok(Response {})
diff --git a/src/traft/topology.rs b/src/traft/topology.rs
index f66978a0da..0a03c48651 100644
--- a/src/traft/topology.rs
+++ b/src/traft/topology.rs
@@ -195,7 +195,17 @@ impl Topology {
             .get_mut(&req.instance_id)
             .ok_or_else(|| format!("unknown instance {}", req.instance_id))?;
 
-        if peer.current_grade == CurrentGradeVariant::Expelled {
+        if peer.current_grade == CurrentGradeVariant::Expelled
+            && !matches!(
+                req,
+                UpdatePeerRequest {
+                    target_grade: None,
+                    current_grade: Some(current_grade),
+                    failure_domain: None,
+                    ..
+                } if current_grade == CurrentGradeVariant::Expelled
+            )
+        {
             return Err(format!(
                 "cannot update expelled peer \"{}\"",
                 peer.instance_id
diff --git a/test/int/test_expelling.py b/test/int/test_expelling.py
index e04d754796..2e28f50bae 100644
--- a/test/int/test_expelling.py
+++ b/test/int/test_expelling.py
@@ -9,11 +9,9 @@ def cluster3(cluster: Cluster):
 
 
 def assert_peer_expelled(expelled_peer: Instance, instance: Instance):
-    current_grade = instance.eval(
-        "return picolib.peer_info(...).current_grade.variant",
-        expelled_peer.instance_id,
-    )
-    assert current_grade == "Expelled"
+    peer_info = instance.call("picolib.peer_info", expelled_peer.instance_id)
+    grades = peer_info["current_grade"]["variant"], peer_info["target_grade"]["variant"]
+    assert ("Expelled", "Expelled") == grades
 
 
 def assert_voters(voters: list[Instance], instance: Instance):
-- 
GitLab