Skip to content
Snippets Groups Projects
failover.rs 4.89 KiB
use std::time::{Duration, Instant};

use ::tarantool::fiber::sleep;
use ::tarantool::proc;
use ::tarantool::unwrap_or;

use crate::{stringify_cfunc, tarantool, tlog};

use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::node;
use crate::traft::Storage;
use crate::traft::TargetGrade;
use crate::traft::{UpdatePeerRequest, UpdatePeerResponse};

pub fn on_shutdown() {
    let node = node::global().unwrap();
    let voters = node
        .storage
        .voters()
        .expect("failed reading voters")
        .unwrap_or_default();
    let learners = node
        .storage
        .learners()
        .expect("failed reading learners")
        .unwrap_or_default();
    let have_active_learners = learners.into_iter().any(|raft_id| {
        Storage::peer_by_raft_id(raft_id)
            .expect("failed reading peer")
            .map(|peer| peer.is_active())
            .unwrap_or(false)
    });
    let raft_id = node.status().id;

    if voters == [raft_id] && !have_active_learners {
        tlog!(Warning, "the last active instance has shut down");
        return;
    }

    let peer = Storage::peer_by_raft_id(raft_id).unwrap().unwrap();
    let cluster_id = node
        .storage
        .cluster_id()
        .unwrap()
        .expect("cluster_id must be present");
    let req = UpdatePeerRequest::new(peer.instance_id, cluster_id)
        .with_target_grade(TargetGrade::Offline);

    let fn_name = stringify_cfunc!(raft_update_peer);
    // will run until we get successfully deactivate or tarantool shuts down
    // the on_shutdown fiber (after 3 secs)
    loop {
        let node = node::global().unwrap();
        let leader_id = unwrap_or!(node.status().leader_id, {
            node.wait_status();
            continue;
        });
        if leader_id == raft_id {
            if let Err(e) = node.handle_topology_request_and_wait(req.into()) {
                crate::warn_or_panic!("failed to deactivate myself: {}", e);
            }
            break;
        }
        let leader = Storage::peer_by_raft_id(leader_id).unwrap().unwrap();
        let wait_before_retry = Duration::from_millis(300);
        let now = Instant::now();

        match tarantool::net_box_call(&leader.peer_address, fn_name, &req, Duration::MAX) {
            Err(e) => {
                tlog!(Warning, "failed to deactivate myself: {e}";
                    "peer" => &leader.peer_address,
                    "fn" => fn_name,
                );
                sleep(wait_before_retry.saturating_sub(now.elapsed()));
                continue;
            }
            Ok(UpdatePeerResponse { .. }) => {
                break;
            }
        };
    }

    // no need to wait for demotion if we weren't a voter
    if !voters.contains(&raft_id) {
        return;
    }

    if let Err(e) = event::wait(event::Event::Demoted) {
        tlog!(Warning, "failed to wait for self demotion: {e}");
    }
}

#[proc(packed_args)]
fn raft_update_peer(
    req: UpdatePeerRequest,
) -> Result<UpdatePeerResponse, Box<dyn std::error::Error>> {
    let node = node::global()?;

    let cluster_id = node
        .storage
        .cluster_id()?
        .ok_or("cluster_id is not set yet")?;

    if req.cluster_id != cluster_id {
        return Err(Box::new(Error::ClusterIdMismatch {
            instance_cluster_id: req.cluster_id,
            cluster_cluster_id: cluster_id,
        }));
    }

    let mut req = req;
    let instance_id = &*req.instance_id;
    req.changes.retain(|ch| match ch {
        super::PeerChange::Grade(grade) => {
            tlog!(Warning, "attempt to change grade by peer";
                "instance_id" => instance_id,
                "grade" => grade.as_str(),
            );
            false
        }
        _ => true,
    });
    node.handle_topology_request_and_wait(req.into())?;
    Ok(UpdatePeerResponse {})
}

pub fn voters_needed(voters: usize, total: usize) -> i64 {
    let voters_expected = match total {
        0 => {
            crate::warn_or_panic!("`voters_needed` was called with `total` = 0");
            0
        }
        1 => 1,
        2 => 2,
        3..=4 => 3,
        5.. => 5,
        _ => unreachable!(
            "just another thing rust is garbage at:
             `5..` covers all the rest of the values,
             but rust can't figure this out"
        ),
    };
    voters_expected - (voters as i64)
}

#[cfg(test)]
mod tests {
    #[test]
    fn voters_needed() {
        assert_eq!(super::voters_needed(0, 1), 1);
        assert_eq!(super::voters_needed(1, 1), 0);
        assert_eq!(super::voters_needed(2, 1), -1);
        assert_eq!(super::voters_needed(0, 2), 2);
        assert_eq!(super::voters_needed(2, 3), 1);
        assert_eq!(super::voters_needed(6, 4), -3);
        assert_eq!(super::voters_needed(1, 5), 4);
        assert_eq!(super::voters_needed(1, 999), 4);
        assert_eq!(super::voters_needed(0, usize::MAX), 5);
        assert_eq!(super::voters_needed(0, u64::MAX as _), 5);
    }
}