From 46567ebcba68f0f8f4e32d1619fdb1c358c83530 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 30 Jun 2022 19:44:17 +0300
Subject: [PATCH] feat(switchover): implement voter switchover rules

---
 src/main.rs             |  67 ++++++++++++--------------
 src/traft/event.rs      |  27 +++++++++++
 src/traft/failover.rs   |  53 ++++++++++++++++++---
 src/traft/mod.rs        |   2 +-
 src/traft/node.rs       | 102 ++++++++++++++++++++++++++++++++++------
 src/traft/storage.rs    |  20 +++++++-
 test/int/test_couple.py |  75 +++++++++++++++++++----------
 7 files changed, 261 insertions(+), 85 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 851940cbe3..d5281e719f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,7 +10,7 @@ use ::tarantool::fiber;
 use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
 use std::convert::TryFrom;
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
 use clap::StructOpt as _;
 use protobuf::Message as _;
@@ -604,42 +604,35 @@ fn postjoin(args: &args::Run) {
     box_cfg.replication = traft::Storage::box_replication(&peer.replicaset_id, None).unwrap();
     tarantool::set_cfg(&box_cfg);
 
-    // loop {
-    //     let timeout = Duration::from_millis(220);
-    //     let me = traft::Storage::peer_by_raft_id(raft_id)
-    //         .unwrap()
-    //         .expect("peer not found");
-
-    //     if me.active && me.peer_address == args.advertise_address() {
-    //         // already ok
-    //         break;
-    //     }
-
-    //     tlog!(Warning, "initiating self-promotion of {me:?}");
-    //     let req = traft::JoinRequest {
-    //         cluster_id: args.cluster_id.clone(),
-    //         instance_id: Some(me.instance_id.clone()),
-    //         replicaset_id: None, // TODO
-    //         voter: true,
-    //         advertise_address: args.advertise_address(),
-    //     };
-
-    //     let leader_id = node.status().leader_id.expect("leader_id deinitialized");
-    //     let leader = traft::Storage::peer_by_raft_id(leader_id).unwrap().unwrap();
-
-    //     let fn_name = stringify_cfunc!(traft::node::raft_join);
-    //     let now = Instant::now();
-    //     match tarantool::net_box_call(&leader.peer_address, fn_name, &req, timeout) {
-    //         Err(e) => {
-    //             tlog!(Error, "failed to promote myself: {e}");
-    //             fiber::sleep(timeout.saturating_sub(now.elapsed()));
-    //             continue;
-    //         }
-    //         Ok(traft::JoinResponse { .. }) => {
-    //             break;
-    //         }
-    //     };
-    // }
+    loop {
+        let instance_id = traft::Storage::peer_by_raft_id(raft_id)
+            .unwrap()
+            .expect("peer must be persisted at the time of postjoin")
+            .instance_id;
+        let cluster_id = traft::Storage::cluster_id()
+            .unwrap()
+            .expect("cluster_id must be persisted at the time of postjoin");
+
+        tlog!(Info, "initiating self-activation of {instance_id:?}");
+        let req = traft::SetActiveRequest::activate(instance_id, cluster_id);
+
+        let leader_id = node.status().leader_id.expect("leader_id deinitialized");
+        let leader = traft::Storage::peer_by_raft_id(leader_id).unwrap().unwrap();
+
+        let fn_name = stringify_cfunc!(traft::failover::raft_set_active);
+        let now = Instant::now();
+        let timeout = Duration::from_millis(220);
+        match tarantool::net_box_call(&leader.peer_address, fn_name, &req, timeout) {
+            Err(e) => {
+                tlog!(Warning, "failed to activate myself: {e}");
+                fiber::sleep(timeout.saturating_sub(now.elapsed()));
+                continue;
+            }
+            Ok(traft::SetActiveResponse { .. }) => {
+                break;
+            }
+        };
+    }
 
     node.mark_as_ready();
 }
diff --git a/src/traft/event.rs b/src/traft/event.rs
index a74e12218a..daaf98eefa 100644
--- a/src/traft/event.rs
+++ b/src/traft/event.rs
@@ -56,6 +56,8 @@ macro_rules! define_events {
 }
 
 define_events! {
+    Demoted, "raft.demoted";
+    LeaveJointState, "raft.leave-joint-state";
     StatusChanged, "raft.status-changed";
     TopologyChanged, "raft.topology-changed";
 }
@@ -173,6 +175,31 @@ pub fn broadcast(event: impl Borrow<Event>) {
     }
 }
 
+/// Postpones the `postpone` event until the `until` event happens.
+///
+/// **NOTE**: the postponement is volatile, so if the instance restarts between
+/// the `postpone` and the `until` events happens, there will not be a
+/// notification.
+///
+/// Adds an event handler which will broadcast the `postpone` event when the
+/// `until` happens.
+///
+/// Returns an error if `EVENTS` is uninitialized
+pub fn postpone_until(postpone: Event, until: Event) -> Result<(), Error> {
+    let mut events = events()?;
+    let cond = events.regular_cond(postpone);
+    events.add_once_handler(
+        until,
+        handler(move || {
+            cond.broadcast();
+            Ok(())
+        }),
+    );
+    // events must be released before yielding
+    drop(events);
+    Ok(())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 /// Struct that handles an event
 pub struct Handler {
diff --git a/src/traft/failover.rs b/src/traft/failover.rs
index 1042b9b3ba..2d76a3835f 100644
--- a/src/traft/failover.rs
+++ b/src/traft/failover.rs
@@ -1,21 +1,23 @@
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
-use ::tarantool::fiber;
+use ::tarantool::fiber::sleep;
 use ::tarantool::proc;
 
 use crate::{stringify_cfunc, tarantool, tlog};
 
+use crate::traft::event;
 use crate::traft::node;
 use crate::traft::node::Error;
 use crate::traft::Storage;
 use crate::traft::{SetActiveRequest, SetActiveResponse};
 
 pub fn on_shutdown() {
-    let voters = Storage::voters().expect("failed reading 'voters'");
+    let voters = Storage::voters().expect("failed reading voters");
+    let active_learners = Storage::active_learners().expect("failed reading active learners");
     let raft_id = node::global().unwrap().status().id;
-    // raft will not let us have a cluster with no voters anyway
-    if !voters.contains(&raft_id) || voters.len() == 1 {
-        tlog!(Info, "not demoting");
+
+    if voters == [raft_id] && active_learners.is_empty() {
+        tlog!(Warning, "the last active instance has shut down");
         return;
     }
 
@@ -34,6 +36,8 @@ pub fn on_shutdown() {
         let status = node::global().unwrap().status();
         let leader_id = status.leader_id.expect("leader_id deinitialized");
         let leader = Storage::peer_by_raft_id(leader_id).unwrap().unwrap();
+        let wait_before_retry = Duration::from_millis(300);
+        let now = Instant::now();
 
         match tarantool::net_box_call(&leader.peer_address, fn_name, &req, Duration::MAX) {
             Err(e) => {
@@ -41,7 +45,7 @@ pub fn on_shutdown() {
                     "peer" => &leader.peer_address,
                     "fn" => fn_name,
                 );
-                fiber::sleep(Duration::from_millis(100));
+                sleep(wait_before_retry.saturating_sub(now.elapsed()));
                 continue;
             }
             Ok(SetActiveResponse { .. }) => {
@@ -49,6 +53,15 @@ pub fn on_shutdown() {
             }
         };
     }
+
+    // no need to wait for demotion if we weren't a voter
+    if !voters.contains(&raft_id) {
+        return;
+    }
+
+    if let Err(e) = event::wait(event::Event::Demoted) {
+        tlog!(Warning, "failed to wait for self demotion: {e}");
+    }
 }
 
 #[proc(packed_args)]
@@ -67,3 +80,29 @@ fn raft_set_active(req: SetActiveRequest) -> Result<SetActiveResponse, Box<dyn s
     let peer = node.handle_topology_request(req.into())?;
     Ok(SetActiveResponse { peer })
 }
+
+pub fn voters_needed(voters: usize, total: usize) -> i64 {
+    let voters_expected = match total {
+        1 => 1,
+        2 => 2,
+        3..=4 => 3,
+        5.. => 5,
+        _ => unreachable!(),
+    };
+    voters_expected - (voters as i64)
+}
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn voters_needed() {
+        assert_eq!(super::voters_needed(0, 1), 1);
+        assert_eq!(super::voters_needed(1, 1), 0);
+        assert_eq!(super::voters_needed(2, 1), -1);
+        assert_eq!(super::voters_needed(0, 2), 2);
+        assert_eq!(super::voters_needed(2, 3), 1);
+        assert_eq!(super::voters_needed(6, 4), -3);
+        assert_eq!(super::voters_needed(1, 5), 4);
+        assert_eq!(super::voters_needed(1, 999), 4);
+    }
+}
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 7dde17c8c3..ded5a0d517 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -460,8 +460,8 @@ impl Default for Health {
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct SetActiveRequest {
     pub kind: Health,
-    pub cluster_id: String,
     pub instance_id: String,
+    pub cluster_id: String,
 }
 impl AsTuple for SetActiveRequest {}
 
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 9c131e2dcc..375a892ff1 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -36,6 +36,7 @@ use crate::tlog;
 use crate::traft;
 use crate::traft::event;
 use crate::traft::event::Event;
+use crate::traft::failover;
 use crate::traft::ConnectionPool;
 use crate::traft::LogicalClock;
 use crate::traft::Storage;
@@ -442,6 +443,7 @@ fn handle_committed_conf_change(
         if let Some(latch) = joint_state_latch {
             latch.notify.notify_ok(());
             *joint_state_latch = None;
+            event::broadcast(Event::LeaveJointState);
         }
     };
 
@@ -480,6 +482,12 @@ fn handle_committed_conf_change(
         _ => unreachable!(),
     };
 
+    let raft_id = &raw_node.raft.id;
+    let voters_old = Storage::voters().unwrap();
+    if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) {
+        event::postpone_until(Event::Demoted, Event::LeaveJointState).ok();
+    }
+
     Storage::persist_conf_state(&conf_state).unwrap();
 }
 
@@ -863,27 +871,80 @@ fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<Normal
         }
 
         let term = Storage::term().unwrap().unwrap_or(0);
-        let conf_state = Storage::conf_state().unwrap();
-        let voters: HashSet<RaftId> = HashSet::from_iter(conf_state.voters);
-        let learners: HashSet<RaftId> = HashSet::from_iter(conf_state.learners);
-        let everybody: HashSet<RaftId> = voters.union(&learners).cloned().collect();
-        let peers: HashMap<RaftId, bool> = Storage::peers()
+        let voter_ids: HashSet<RaftId> = HashSet::from_iter(Storage::voters().unwrap());
+        let learner_ids: HashSet<RaftId> = HashSet::from_iter(Storage::learners().unwrap());
+        let peer_is_active: HashMap<RaftId, bool> = Storage::peers()
             .unwrap()
-            .iter()
+            .into_iter()
             .map(|peer| (peer.raft_id, peer.is_active()))
             .collect();
+
+        let (active_voters, to_demote): (Vec<RaftId>, Vec<RaftId>) = voter_ids
+            .iter()
+            .partition(|id| peer_is_active.get(id).copied().unwrap_or(false));
+
+        let active_learners: Vec<RaftId> = learner_ids
+            .iter()
+            .copied()
+            .filter(|id| peer_is_active.get(id).copied().unwrap_or(false))
+            .collect();
+
+        let new_peers: Vec<RaftId> = peer_is_active
+            .iter()
+            .map(|(&id, _)| id)
+            .filter(|id| !voter_ids.contains(id) && !learner_ids.contains(id))
+            .collect();
+
         let mut changes: Vec<raft::ConfChangeSingle> = Vec::new();
 
-        for (node_id, _active) in peers {
-            if everybody.contains(&node_id) {
-                continue;
+        const VOTER: bool = true;
+        const LEARNER: bool = false;
+
+        changes.extend(
+            to_demote
+                .into_iter()
+                .map(|id| conf_change_single(id, LEARNER)),
+        );
+
+        let total_active = active_voters.len() + active_learners.len() + new_peers.len();
+
+        let new_peers_to_promote;
+        match failover::voters_needed(active_voters.len(), total_active) {
+            0 => {
+                new_peers_to_promote = 0;
+            }
+            pos @ 1..=i64::MAX => {
+                let pos = pos as usize;
+                eprintln!("\x1b[35madd {pos} voters\x1b[0m");
+                if pos < active_learners.len() {
+                    for &raft_id in &active_learners[0..pos] {
+                        changes.push(conf_change_single(raft_id, VOTER))
+                    }
+                    new_peers_to_promote = 0;
+                } else {
+                    for &raft_id in &active_learners {
+                        changes.push(conf_change_single(raft_id, VOTER))
+                    }
+                    new_peers_to_promote = pos - active_learners.len();
+                    assert!(new_peers_to_promote <= new_peers.len());
+                    for &raft_id in &new_peers[0..new_peers_to_promote] {
+                        changes.push(conf_change_single(raft_id, VOTER))
+                    }
+                }
             }
+            neg @ i64::MIN..=-1 => {
+                let neg = -neg as usize;
+                eprintln!("\x1b[35mremove {neg} voters\x1b[0m");
+                assert!(neg < active_voters.len());
+                for &raft_id in &active_voters[0..neg] {
+                    changes.push(conf_change_single(raft_id, LEARNER))
+                }
+                new_peers_to_promote = 0;
+            }
+        }
 
-            changes.push(raft::ConfChangeSingle {
-                change_type: raft::ConfChangeType::AddNode,
-                node_id,
-                ..Default::default()
-            });
+        for &raft_id in &new_peers[new_peers_to_promote..] {
+            changes.push(conf_change_single(raft_id, LEARNER))
         }
 
         if changes.is_empty() {
@@ -915,6 +976,19 @@ fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<Normal
     }
 }
 
+fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
+    let change_type = if is_voter {
+        raft::ConfChangeType::AddNode
+    } else {
+        raft::ConfChangeType::AddLearnerNode
+    };
+    raft::ConfChangeSingle {
+        change_type,
+        node_id,
+        ..Default::default()
+    }
+}
+
 static mut RAFT_NODE: Option<Box<Node>> = None;
 
 pub fn set_global(node: Node) {
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index 7091ce29a8..0d28cb6bd4 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -21,12 +21,15 @@ const RAFT_GROUP: &str = "raft_group";
 const RAFT_STATE: &str = "raft_state";
 const RAFT_LOG: &str = "raft_log";
 
+#[allow(clippy::enum_variant_names)]
 #[derive(Debug, Error)]
 enum Error {
     #[error("no such space \"{0}\"")]
     NoSuchSpace(String),
     #[error("no such index \"{1}\" in space \"{0}\"")]
     NoSuchIndex(String, String),
+    #[error("no peer with id {0}")]
+    NoSuchPeer(RaftId),
 }
 
 macro_rules! box_err {
@@ -80,7 +83,7 @@ impl Storage {
                     {name = 'replicaset_id', type = 'string', is_nullable = false},
                     {name = 'replicaset_uuid', type = 'string', is_nullable = false},
                     {name = 'commit_index', type = 'unsigned', is_nullable = false},
-                    {name = 'is_active', type = 'string', is_nullable = false},
+                    {name = 'health', type = 'string', is_nullable = false},
                 }
             })
 
@@ -345,6 +348,21 @@ impl Storage {
         Ok(Storage::raft_state("learners")?.unwrap_or_default())
     }
 
+    pub fn active_learners() -> Result<Vec<RaftId>, StorageError> {
+        let learners = Storage::learners()?;
+        let mut res = Vec::with_capacity(learners.len());
+        for raft_id in learners {
+            if Storage::peer_by_raft_id(raft_id)?
+                .ok_or(Error::NoSuchPeer(raft_id))
+                .map_err(box_err!())?
+                .is_active()
+            {
+                res.push(raft_id)
+            }
+        }
+        Ok(res)
+    }
+
     pub fn conf_state() -> Result<raft::ConfState, StorageError> {
         Ok(raft::ConfState {
             voters: Storage::voters()?,
diff --git a/test/int/test_couple.py b/test/int/test_couple.py
index f6aecf24c6..00c268e61d 100644
--- a/test/int/test_couple.py
+++ b/test/int/test_couple.py
@@ -107,32 +107,47 @@ def test_restart_both(cluster2: Cluster):
 def test_deactivation(cluster2: Cluster):
     i1, i2 = cluster2.instances
 
-    def is_voter_is_active(instance: Instance):
-        code = """
-            function table_find(tbl, val)
-                for _, v in pairs(tbl) do
-                    if v == val then
-                        return true
+    def is_voter_is_active(instance: Instance, raft_id):
+        return tuple(
+            instance.eval(
+                """
+                    raft_id = ...
+                    health = box.space.raft_group.index.raft_id:get(raft_id).health
+                    is_active = health == 'Online'
+                    voters = box.space.raft_state:get('voters').value
+                    for _, voter in pairs(voters) do
+                        if voter == raft_id then
+                            return { true, is_active }
+                        end
                     end
-                end
-                return false
-            end
+                    return { false, is_active }
+                """,
+                raft_id,
+            )
+        )
 
-            local peer = box.space.raft_group:get(...)
-            local voters = box.space.raft_state:get("voters").value
-            return { table_find(voters, peer.raft_id), peer.is_active }
-        """
-        return tuple(instance.eval(code, instance.instance_id))
+    assert is_voter_is_active(i1, i1.raft_id) == (True, True)
+    assert is_voter_is_active(i2, i2.raft_id) == (True, True)
 
-    assert is_voter_is_active(i1) == (True, "Online")
-    assert is_voter_is_active(i2) == (True, "Online")
+    i2.terminate()
+
+    assert is_voter_is_active(i1, i1.raft_id) == (True, True)
+    assert is_voter_is_active(i1, i2.raft_id) == (False, False)
+
+    i2.start()
+    i2.wait_ready()
+
+    assert is_voter_is_active(i1, i1.raft_id) == (True, True)
+    assert is_voter_is_active(i2, i2.raft_id) == (True, True)
 
     i1.terminate()
 
-    pytest.xfail("Refactoring broke voters auto demotion")
+    assert is_voter_is_active(i2, i1.raft_id) == (False, False)
+    assert is_voter_is_active(i2, i2.raft_id) == (True, True)
 
-    assert is_voter_is_active(i2) == (False, "Offline")
-    assert is_voter_is_active(i2) == (True, "Online")
+    # wait until i2 is leader, so it has someone to send the deactivation
+    # request to
+    i2.promote_or_fail()
 
     i2.terminate()
 
@@ -142,12 +157,22 @@ def test_deactivation(cluster2: Cluster):
     i1.wait_ready()
     i2.wait_ready()
 
-    assert is_voter_is_active(i1) == (True, "Online")
-    assert is_voter_is_active(i2) == (True, "Online")
+    assert is_voter_is_active(i1, i1.raft_id) == (True, True)
+    assert is_voter_is_active(i2, i2.raft_id) == (True, True)
 
-    i1.promote_or_fail()
+    i1.terminate()
 
-    i2.terminate()
+    assert is_voter_is_active(i2, i1.raft_id) == (False, False)
+    assert is_voter_is_active(i2, i2.raft_id) == (True, True)
+
+    def raft_set_active(host: Instance, target: Instance, is_active: bool) -> list[bool]:
+        kind = "Online" if is_active else "Offline"
+        resps = host.call(".raft_set_active", kind, target.instance_id, target.cluster_id)
+        return [resp['peer']['health'] == 'Online' for resp in resps]
+
+    # check idempotency
+    assert raft_set_active(i2, target=i1, is_active=False) == [False]
+    assert raft_set_active(i2, target=i1, is_active=False) == [False]
 
-    assert i1.call(".raft_deactivate", i2.instance_id, i2.cluster_id) == [{}]
-    assert i1.call(".raft_deactivate", i2.instance_id, i2.cluster_id) == [{}]
+    assert raft_set_active(i2, target=i2, is_active=True) == [True]
+    assert raft_set_active(i2, target=i2, is_active=True) == [True]
-- 
GitLab