Skip to content
Snippets Groups Projects
sentinel.rs 9.02 KiB
use crate::has_grades;
use crate::instance::GradeVariant::*;
use crate::reachability::InstanceReachabilityManagerRef;
use crate::rpc;
use crate::storage::Clusterwide;
use crate::tlog;
use crate::traft::error::Error;
use crate::traft::network::ConnectionPool;
use crate::traft::{node, RaftSpaceAccess};
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::watch;
use std::ops::ControlFlow;
use std::rc::Rc;
use std::time::Duration;

impl Loop {
    /// A value for non-urgent timeouts, e.g. nothing needed to be done during
    /// a loop iteration.
    const SENTINEL_LONG_SLEEP: Duration = Duration::from_secs(1);

    /// A value for urgent timeouts, e.g. retry of failed update peer request.
    const SENTINEL_SHORT_RETRY: Duration = Duration::from_millis(300);

    const UPDATE_INSTANCE_TIMEOUT: Duration = Duration::from_secs(3);

    async fn iter_fn(
        State {
            pool,
            storage,
            raft_storage,
            raft_status,
            status,
            instance_reachability,
        }: &mut State,
    ) -> ControlFlow<()> {
        if status.get() == SentinelStatus::Initial || node::global().is_err() {
            tlog!(Debug, "waiting until initialized...");
            _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await;
            return ControlFlow::Continue(());
        }

        let node = node::global().expect("just checked it's ok");
        let cluster_id = raft_storage.cluster_id().expect("storage shouldn't fail");

        ////////////////////////////////////////////////////////////////////////
        // Awoken during graceful shutdown.
        // Should change own target grade to Offline and finish.
        if status.get() == SentinelStatus::ShuttingDown {
            let raft_id = node.raft_id();
            let Ok(instance) = storage.instances.get(&raft_id) else {
                // This can happen if for example a snapshot arrives
                // and we truncate _pico_instance (read uncommitted btw).
                // In this case we also just wait some more.
                _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await;
                return ControlFlow::Continue(());
            };

            let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id)
                .with_target_grade(Offline);

            tlog!(Info, "setting own target grade Offline");
            let timeout = Self::SENTINEL_SHORT_RETRY;
            loop {
                let now = fiber::clock();
                let res = async {
                    let Some(leader_id) = raft_status.get().leader_id else {
                        return Err(Error::LeaderUnknown);
                    };
                    pool.call(&leader_id, &req, timeout)?.await?;
                    Ok(())
                }
                .await;
                match res {
                    Ok(_) => return ControlFlow::Break(()),
                    Err(e) => {
                        tlog!(Warning,
                            "failed setting own target grade Offline: {e}, retrying ...";
                        );
                        fiber::sleep(timeout.saturating_sub(now.elapsed()));
                        continue;
                    }
                }
            }
        }

        ////////////////////////////////////////////////////////////////////////
        // When running on leader, find any unreachable instances which need to
        // have their grade automatically changed.
        if raft_status.get().raft_state.is_leader() {
            let instances = storage
                .instances
                .all_instances()
                .expect("storage shouldn't fail");
            let unreachables = instance_reachability.borrow().get_unreachables();
            let mut instance_to_downgrade = None;
            for instance in &instances {
                if has_grades!(instance, * -> Online) && unreachables.contains(&instance.raft_id) {
                    instance_to_downgrade = Some(instance);
                }
            }
            let Some(instance) = instance_to_downgrade else {
                _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await;
                return ControlFlow::Continue(());
            };

            tlog!(Info, "setting target grade Offline"; "instance_id" => %instance.instance_id);
            let req = rpc::update_instance::Request::new(instance.instance_id.clone(), cluster_id)
                // We only try setting the grade once and if a CaS conflict
                // happens we should reassess the situation, because somebody
                // else could have changed this particular instance's target grade.
                .with_dont_retry(true)
                .with_target_grade(Offline);
            let res = rpc::update_instance::handle_update_instance_request_and_wait(
                req,
                Self::UPDATE_INSTANCE_TIMEOUT,
            );
            if let Err(e) = res {
                tlog!(Warning,
                    "failed setting target grade Offline: {e}";
                    "instance_id" => %instance.instance_id,
                );
            }

            _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await;
            return ControlFlow::Continue(());
        }

        ////////////////////////////////////////////////////////////////////////
        // When running not on leader, check if own target has automatically
        // changed to Offline and try to update it to Online.
        let raft_id = node.raft_id();
        let Ok(instance) = storage.instances.get(&raft_id) else {
            // This can happen if for example a snapshot arrives
            // and we truncate _pico_instance (read uncommitted btw).
            // In this case we also just wait some more.
            _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await;
            return ControlFlow::Continue(());
        };
        if has_grades!(instance, * -> Offline) {
            tlog!(Info, "setting own target grade Online");
            let req = rpc::update_instance::Request::new(instance.instance_id.clone(), cluster_id)
                // We only try setting the grade once and if a CaS conflict
                // happens we should reassess the situation, because somebody
                // else could have changed this particular instance's target grade.
                .with_dont_retry(true)
                .with_target_grade(Online);
            let res = async {
                let Some(leader_id) = raft_status.get().leader_id else {
                    return Err(Error::LeaderUnknown);
                };
                pool.call(&leader_id, &req, Self::UPDATE_INSTANCE_TIMEOUT)?
                    .await?;
                Ok(())
            }
            .await;
            if let Err(e) = res {
                tlog!(Warning, "failed setting own target grade Online: {e}");
            }

            _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await;
            return ControlFlow::Continue(());
        }

        _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await;
        return ControlFlow::Continue(());
    }

    pub fn start(
        pool: Rc<ConnectionPool>,
        raft_status: watch::Receiver<node::Status>,
        storage: Clusterwide,
        raft_storage: RaftSpaceAccess,
        instance_reachability: InstanceReachabilityManagerRef,
    ) -> Self {
        let (status_tx, status_rx) = watch::channel(SentinelStatus::Initial);

        let state = State {
            pool,
            storage,
            raft_storage,
            raft_status,
            status: status_rx,
            instance_reachability,
        };

        Self {
            _loop: crate::loop_start!("sentinel_loop", Self::iter_fn, state),
            status: status_tx,
        }
    }

    pub fn on_shut_down(&self) {
        self.status
            .send(SentinelStatus::ShuttingDown)
            .expect("we shouldn't be holding references to the value")
    }

    pub fn on_self_activate(&self) {
        self.status
            .send(SentinelStatus::Activated)
            .expect("we shouldn't be holding references to the value")
    }
}

pub struct Loop {
    _loop: Option<fiber::JoinHandle<'static, ()>>,
    status: watch::Sender<SentinelStatus>,
}

/// Describes possible states of the current instance with respect to what
/// sentinel should be doing.
///
/// TODO: maybe this should be merged with [`node::Status`].
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
enum SentinelStatus {
    /// Instance has started, but didn't yet receive confirmation from the
    /// leader that it was activated.
    #[default]
    Initial,

    /// Instance has been activated, sentinel is doing it's normal job.
    Activated,

    /// Instance is currently gracefully shutting down.
    ShuttingDown,
}

struct State {
    pool: Rc<ConnectionPool>,
    storage: Clusterwide,
    raft_storage: RaftSpaceAccess,
    raft_status: watch::Receiver<node::Status>,
    status: watch::Receiver<SentinelStatus>,
    instance_reachability: InstanceReachabilityManagerRef,
}