diff --git a/src/lib.rs b/src/lib.rs index 3cc8b199d1f800455499f95b72456f1bf2e65683..adfccd652edb15b479c1012aa93f6b337f854cf9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,7 @@ pub mod on_shutdown; pub mod replicaset; pub mod rpc; pub mod schema; +pub mod sentinel; pub mod sql; pub mod storage; pub mod sync; @@ -701,6 +702,8 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces } } } + + node.sentinel_loop.on_self_activate(); } pub async fn tt_expel(args: args::Expel) { diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index d460c15d5d7c654c9e0d187a42d731b50067c40b..c1cec1d759b95fc010c43ae8a6145573132177d6 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -1,30 +1,21 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; use ::tarantool::fiber; use crate::has_grades; -use crate::instance::grade::TargetGradeVariant; -use crate::rpc; -use crate::rpc::update_instance::handle_update_instance_request_and_wait; use crate::storage::ClusterwideSpaceId; use crate::tlog; -use crate::traft; use crate::traft::node; use crate::unwrap_ok_or; pub async fn callback() { - // 1. Try setting target grade Offline in a separate fiber - tlog!(Info, "trying to shutdown gracefully ..."); - let go_offline = fiber::Builder::new() - .name("go_offline") - .func(|| match go_offline() { - Ok(()) => tlog!(Info, "target grade set Offline"), - Err(e) => tlog!(Error, "failed setting target grade Offline: {e}"), - }); - std::mem::forget(go_offline.start()); + let node = node::global().unwrap(); + + // 1. Wake up the sentinel so it starts trying to set target grade Offline. + node.sentinel_loop.on_shut_down(); + fiber::reschedule(); // 2. Meanwhile, wait until either it succeeds or there is no quorum. - let node = node::global().unwrap(); let raft_id = node.raft_id(); let mut instances_watcher = node.storage_watcher(ClusterwideSpaceId::Instance); loop { @@ -66,30 +57,3 @@ pub async fn callback() { } } } - -fn go_offline() -> traft::Result<()> { - let node = node::global()?; - let raft_id = node.raft_id(); - - let instance = node.storage.instances.get(&raft_id)?; - let cluster_id = node.raft_storage.cluster_id()?; - - let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id) - .with_target_grade(TargetGradeVariant::Offline); - - loop { - let now = Instant::now(); - let wait_before_retry = Duration::from_millis(300); - - match handle_update_instance_request_and_wait(req.clone(), wait_before_retry) { - Ok(_) => break Ok(()), - Err(e) => { - tlog!(Warning, - "failed setting target grade Offline: {e}, retrying ..."; - ); - fiber::sleep(wait_before_retry.saturating_sub(now.elapsed())); - continue; - } - }; - } -} diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs index 03a7f9186b384fe63a2618ac45242496b8547f6d..66e6d186443956ddd6dc8bccdd995c98d95c5556 100644 --- a/src/rpc/update_instance.rs +++ b/src/rpc/update_instance.rs @@ -44,6 +44,8 @@ crate::define_rpc_request! { /// Can be set by instance pub target_grade: Option<TargetGradeVariant>, pub failure_domain: Option<FailureDomain>, + /// If `true` then the resulting CaS request is not retried upon failure. + pub dont_retry: bool, } pub struct Response {} @@ -55,10 +57,16 @@ impl Request { Self { instance_id, cluster_id, + dont_retry: false, ..Request::default() } } #[inline] + pub fn with_dont_retry(mut self, value: bool) -> Self { + self.dont_retry = value; + self + } + #[inline] pub fn with_current_grade(mut self, value: CurrentGrade) -> Self { self.current_grade = Some(value); self @@ -127,7 +135,10 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) } } Err(err) => { - if err.is_cas_err() | err.is_term_mismatch_err() { + if req.dont_retry { + return Err(err); + } + if err.is_cas_err() || err.is_term_mismatch_err() { // cas error - retry fiber::sleep(Duration::from_millis(500)); continue; diff --git a/src/sentinel.rs b/src/sentinel.rs new file mode 100644 index 0000000000000000000000000000000000000000..5ef46dcc1d9fe1b4bcc5fb82f03e534a839f53e7 --- /dev/null +++ b/src/sentinel.rs @@ -0,0 +1,230 @@ +use crate::has_grades; +use crate::instance::grade::TargetGradeVariant; +use crate::r#loop::FlowControl::{self, Break, Continue}; +use crate::rpc; +use crate::storage::Clusterwide; +use crate::tlog; +use crate::traft::error::Error; +use crate::traft::network::ConnectionPool; +use crate::traft::network::InstanceReachabilityManagerRef; +use crate::traft::{node, RaftSpaceAccess}; +use ::tarantool::fiber; +use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; +use ::tarantool::fiber::r#async::watch; +use std::rc::Rc; +use std::time::Duration; + +impl Loop { + /// A value for non-urgent timeouts, e.g. nothing needed to be done during + /// a loop iteration. + const SENTINEL_LONG_SLEEP: Duration = Duration::from_secs(1); + + /// A value for urgent timeouts, e.g. retry of failed update peer request. + const SENTINEL_SHORT_RETRY: Duration = Duration::from_millis(300); + + const UPDATE_INSTANCE_TIMEOUT: Duration = Duration::from_secs(3); + + async fn iter_fn( + State { + pool, + storage, + raft_storage, + raft_status, + status, + instance_reachability, + }: &mut State, + ) -> FlowControl { + if status.get() == SentinelStatus::Initial || node::global().is_err() { + tlog!(Info, "waiting until initialized..."); + _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await; + return Continue; + } + + let node = node::global().expect("just checked it's ok"); + let cluster_id = raft_storage.cluster_id().expect("storage shouldn't fail"); + + //////////////////////////////////////////////////////////////////////// + // Awoken during graceful shutdown. + // Should change own target grade to Offline and finish. + if status.get() == SentinelStatus::ShuttingDown { + let raft_id = node.raft_id(); + let instance = storage + .instances + .get(&raft_id) + .expect("storage shouldn't fail"); + + let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id) + .with_target_grade(TargetGradeVariant::Offline); + + tlog!(Info, "setting own target grade Offline"); + let timeout = Self::SENTINEL_SHORT_RETRY; + loop { + let now = fiber::clock(); + let res = async { + let Some(leader_id) = raft_status.get().leader_id else { + return Err(Error::LeaderUnknown); + }; + pool.call(&leader_id, &req, timeout)?.await?; + Ok(()) + } + .await; + match res { + Ok(_) => return Break, + Err(e) => { + tlog!(Warning, + "failed setting own target grade Offline: {e}, retrying ..."; + ); + fiber::sleep(timeout.saturating_sub(now.elapsed())); + continue; + } + } + } + } + + //////////////////////////////////////////////////////////////////////// + // When running on leader, find any unreachable instances which need to + // have their grade automatically changed. + if raft_status.get().raft_state.is_leader() { + let instances = storage + .instances + .all_instances() + .expect("storage shouldn't fail"); + let unreachables = instance_reachability.borrow().get_unreachables(); + let mut instance_to_downgrade = None; + for instance in &instances { + if has_grades!(instance, * -> Online) && unreachables.contains(&instance.raft_id) { + instance_to_downgrade = Some(instance); + } + } + let Some(instance) = instance_to_downgrade else { + _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await; + return Continue; + }; + + tlog!(Info, "setting target grade Offline"; "instance_id" => %instance.instance_id); + let req = rpc::update_instance::Request::new(instance.instance_id.clone(), cluster_id) + // We only try setting the grade once and if a CaS conflict + // happens we should reassess the situation, because somebody + // else could have changed this particular instance's target grade. + .with_dont_retry(true) + .with_target_grade(TargetGradeVariant::Offline); + let res = rpc::update_instance::handle_update_instance_request_and_wait( + req, + Self::UPDATE_INSTANCE_TIMEOUT, + ); + if let Err(e) = res { + tlog!(Warning, + "failed setting target grade Offline: {e}"; + "instance_id" => %instance.instance_id, + ); + } + + _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await; + return Continue; + } + + //////////////////////////////////////////////////////////////////////// + // When running not on leader, check if own target has automatically + // changed to Offline and try to update it to Online. + let raft_id = node.raft_id(); + let instance = storage + .instances + .get(&raft_id) + .expect("storage shouldn't fail"); + if has_grades!(instance, * -> Offline) { + tlog!(Info, "setting own target grade Online"); + let req = rpc::update_instance::Request::new(instance.instance_id.clone(), cluster_id) + // We only try setting the grade once and if a CaS conflict + // happens we should reassess the situation, because somebody + // else could have changed this particular instance's target grade. + .with_dont_retry(true) + .with_target_grade(TargetGradeVariant::Online); + let res = async { + let Some(leader_id) = raft_status.get().leader_id else { + return Err(Error::LeaderUnknown); + }; + pool.call(&leader_id, &req, Self::UPDATE_INSTANCE_TIMEOUT)? + .await?; + Ok(()) + } + .await; + if let Err(e) = res { + tlog!(Warning, "failed setting own target grade Online: {e}"); + } + + _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await; + return Continue; + } + + _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await; + return Continue; + } + + pub fn start( + pool: Rc<ConnectionPool>, + raft_status: watch::Receiver<node::Status>, + storage: Clusterwide, + raft_storage: RaftSpaceAccess, + instance_reachability: InstanceReachabilityManagerRef, + ) -> Self { + let (status_tx, status_rx) = watch::channel(SentinelStatus::Initial); + + let state = State { + pool, + storage, + raft_storage, + raft_status, + status: status_rx, + instance_reachability, + }; + + Self { + _loop: crate::loop_start!("sentinel_loop", Self::iter_fn, state), + status: status_tx, + } + } + + pub fn on_shut_down(&self) { + self.status + .send(SentinelStatus::ShuttingDown) + .expect("we shouldn't be holding references to the value") + } + + pub fn on_self_activate(&self) { + self.status + .send(SentinelStatus::Activated) + .expect("we shouldn't be holding references to the value") + } +} + +pub struct Loop { + _loop: Option<fiber::UnitJoinHandle<'static>>, + status: watch::Sender<SentinelStatus>, +} + +/// Describes possible states of the current instance with respect to what +/// sentinel should be doing. +/// +/// TODO: maybe this should be merged with [`node::Status`]. +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +enum SentinelStatus { + /// Instance has started, but didn't yet receive confirmation from the + /// leader that it was activated. + #[default] + Initial, + + /// Instance has been activated, sentinel is doing it's normal job. + Activated, + + /// Instance is currently gracefully shutting down. + ShuttingDown, +} + +struct State { + pool: Rc<ConnectionPool>, + storage: Clusterwide, + raft_storage: RaftSpaceAccess, + raft_status: watch::Receiver<node::Status>, + status: watch::Receiver<SentinelStatus>, + instance_reachability: InstanceReachabilityManagerRef, +} diff --git a/src/traft/network.rs b/src/traft/network.rs index 5a93d6db5d5aa726dff9835b0ae6c48ce0820906..dceb03c591ab13be3e22fbf5ad8f886607f7ec96 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -16,6 +16,7 @@ use futures::FutureExt as _; use std::cell::RefCell; use std::cell::UnsafeCell; use std::collections::HashMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::pin::Pin; use std::rc::Rc; @@ -410,6 +411,7 @@ pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager impl InstanceReachabilityManager { // TODO: make these configurable via _pico_property + const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5); const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5); pub fn new(storage: Clusterwide) -> Self { @@ -420,7 +422,8 @@ impl InstanceReachabilityManager { } /// Is called from a connection pool worker loop to report results of raft - /// messages sent to other instances. Updates info for the given instance. + /// messages sent to other instances. For example a timeout is considered + /// a failure. Updates info for the given instance. pub fn report_result(&mut self, raft_id: RaftId, success: bool) { let now = fiber::clock(); let info = self.infos.entry(raft_id).or_insert_with(Default::default); @@ -446,18 +449,11 @@ impl InstanceReachabilityManager { pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> { let mut res = Vec::with_capacity(16); for (raft_id, info) in &mut self.infos { - if info.last_success.is_none() { - // Don't report nodes which didn't previously respond once, - // so that they can safely boot atleast. - continue; - } if info.is_reported { // Don't report nodes repeatedly. continue; } - // TODO: add configurable parameters, for example number of attempts - // before report. - if info.fail_streak > 0 { + if Self::determine_reachability(info) == Unreachable { res.push(*raft_id); info.is_reported = true; } @@ -465,6 +461,39 @@ impl InstanceReachabilityManager { res } + /// Is called by sentinel to get information about which instances should be + /// automatically assigned a different grade. + pub fn get_unreachables(&self) -> HashSet<RaftId> { + let mut res = HashSet::with_capacity(self.infos.len() / 3); + for (raft_id, info) in &self.infos { + if Self::determine_reachability(info) == Unreachable { + res.insert(*raft_id); + } + } + return res; + } + + /// Make a descision on the given instance's reachability based on the + /// provided `info`. This is an internal function. + fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState { + let Some(last_success) = info.last_success else { + // Don't make decisions about instances which didn't previously + // respond once so as to not interrup the process of booting up. + // TODO: report unreachable if fail_streak is big enough. + return Undecided; + }; + if info.fail_streak == 0 { + // Didn't fail once, so can't be unreachable. + return Reachable; + } + let now = fiber::clock(); + if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS { + Unreachable + } else { + Reachable + } + } + /// Is called from raft main loop when handling raft messages, passing a /// raft id of an instance which was previously determined to be unreachable. /// This function makes a decision about how often raft hearbeat messages @@ -504,8 +533,9 @@ impl InstanceReachabilityManager { // DN == attemptN.duration_since(last_success) // let now = fiber::clock(); - let wait_timeout = last_attempt.duration_since(last_success).min(Self::MAX_HEARTBEAT_PERIOD); - if now > last_attempt + wait_timeout { + let since_last_success = last_attempt.duration_since(last_success); + let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD); + if now > last_attempt + delay { return true; } @@ -513,6 +543,14 @@ impl InstanceReachabilityManager { } } +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ReachabilityState { + Undecided, + Reachable, + Unreachable, +} +use ReachabilityState::*; + /// Information about recent attempts to communicate with a single given instance. #[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct InstanceReachabilityInfo { diff --git a/src/traft/node.rs b/src/traft/node.rs index b1b09c9f5f022a3208da5e047e359286b248a26c..8ddad96eae3e02aa8bf1038b063bb96294d1541b 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -13,6 +13,7 @@ use crate::loop_start; use crate::r#loop::FlowControl; use crate::rpc; use crate::schema::{Distribution, IndexDef, SpaceDef}; +use crate::sentinel; use crate::storage::acl; use crate::storage::ddl_meta_drop_space; use crate::storage::SnapshotData; @@ -64,6 +65,7 @@ use ::tarantool::vclock::Vclock; use protobuf::Message as _; use std::cell::Cell; +use std::cell::RefCell; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; @@ -141,6 +143,7 @@ pub struct Node { pub(crate) raft_storage: RaftSpaceAccess, pub(crate) main_loop: MainLoop, pub(crate) governor_loop: governor::Loop, + pub(crate) sentinel_loop: sentinel::Loop, status: watch::Receiver<Status>, watchers: Rc<Mutex<StorageWatchers>>, @@ -171,7 +174,7 @@ impl Node { let instance_reachability = Rc::new(RefCell::new(InstanceReachabilityManager::new( storage.clone(), ))); - pool.instance_reachability = instance_reachability; + pool.instance_reachability = instance_reachability.clone(); let pool = Rc::new(pool); let node_impl = NodeImpl::new(pool.clone(), storage.clone(), raft_storage.clone())?; @@ -186,10 +189,17 @@ impl Node { raft_id, main_loop: MainLoop::start(node_impl.clone(), watchers.clone()), // yields governor_loop: governor::Loop::start( + pool.clone(), + status.clone(), + storage.clone(), + raft_storage.clone(), + ), + sentinel_loop: sentinel::Loop::start( pool, status.clone(), storage.clone(), raft_storage.clone(), + instance_reachability, ), node_impl, storage, diff --git a/test/int/test_network_effects.py b/test/int/test_network_effects.py index 152995ddb3b93b8f99fced19c06bfafc4dcb3655..08152f40e25965b88197a3d0edb988c6148c7210 100644 --- a/test/int/test_network_effects.py +++ b/test/int/test_network_effects.py @@ -1,6 +1,7 @@ import pytest +import time -from conftest import Cluster, Instance, retrying, ReturnError +from conftest import Cluster, Instance, retrying, ReturnError, Retriable @pytest.fixture @@ -143,3 +144,31 @@ def test_leader_disruption(cluster3: Cluster): # i3 should become the follower again without disrupting i1 retrying(lambda: i3.assert_raft_status("Follower", i1.raft_id)) + + +def get_instance_grades(peer: Instance, instance_id) -> tuple[str, str]: + instance_info = peer.call("pico.instance_info", instance_id) + return ( + instance_info["current_grade"]["variant"], + instance_info["target_grade"]["variant"], + ) + + +def test_instance_automatic_offline_detection(cluster: Cluster): + i1, i2, i3 = cluster.deploy(instance_count=3) + + assert get_instance_grades(i1, i3.instance_id) == ("Online", "Online") + + i3.kill() + + # Give the governor some time to detect the problem and act accordingly. + time.sleep(10) + + assert get_instance_grades(i1, i3.instance_id) == ("Offline", "Offline") + + i3.start() + + def assert_online(peer, instance_id): + assert get_instance_grades(peer, instance_id) == ("Online", "Online") + + Retriable(timeout=6, rps=5).call(lambda: assert_online(i1, i3.instance_id))