diff --git a/src/reachability.rs b/src/reachability.rs index 366ed585e7a549d3d26aff47fe9327e3fe4520ef..3de3f0509f7751104d2c5ba7b58a0b4848bc4697 100644 --- a/src/reachability.rs +++ b/src/reachability.rs @@ -1,4 +1,5 @@ use crate::storage::Clusterwide; +use crate::storage::PropertyName; use crate::traft::RaftId; use std::cell::RefCell; use std::collections::HashMap; @@ -26,8 +27,8 @@ pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager impl InstanceReachabilityManager { // TODO: make these configurable via _pico_property - const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5); - const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5); + const DEFAULT_AUTO_OFFLINE_TIMEOUT: Duration = Duration::from_secs(5); + const DEFAULT_MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5); pub fn new(storage: Clusterwide) -> Self { Self { @@ -62,13 +63,18 @@ impl InstanceReachabilityManager { /// Is called at the beginning of the raft main loop to get information /// about which instances should be reported as unreachable to the raft node. pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> { + // This is how you turn off the borrow checker by the way. + let this = self as *const Self; + let mut res = Vec::with_capacity(16); for (raft_id, info) in &mut self.infos { if info.is_reported { // Don't report nodes repeatedly. continue; } - if Self::determine_reachability(info) == Unreachable { + // SAFETY: this is safe because we're not accessing `self.infos` in + // this function. + if unsafe { &*this }.determine_reachability(info) == Unreachable { res.push(*raft_id); info.is_reported = true; } @@ -81,7 +87,7 @@ impl InstanceReachabilityManager { pub fn get_unreachables(&self) -> HashSet<RaftId> { let mut res = HashSet::with_capacity(self.infos.len() / 3); for (raft_id, info) in &self.infos { - if Self::determine_reachability(info) == Unreachable { + if self.determine_reachability(info) == Unreachable { res.insert(*raft_id); } } @@ -90,7 +96,7 @@ impl InstanceReachabilityManager { /// Make a descision on the given instance's reachability based on the /// provided `info`. This is an internal function. - fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState { + fn determine_reachability(&self, info: &InstanceReachabilityInfo) -> ReachabilityState { let Some(last_success) = info.last_success else { // Don't make decisions about instances which didn't previously // respond once so as to not interrup the process of booting up. @@ -102,7 +108,7 @@ impl InstanceReachabilityManager { return Reachable; } let now = fiber::clock(); - if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS { + if now.duration_since(last_success) > self.auto_offline_timeout() { Unreachable } else { Reachable @@ -149,13 +155,45 @@ impl InstanceReachabilityManager { // let now = fiber::clock(); let since_last_success = last_attempt.duration_since(last_success); - let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD); + let delay = since_last_success.min(self.max_heartbeat_period()); if now > last_attempt + delay { return true; } return false; } + + fn auto_offline_timeout(&self) -> Duration { + // TODO: it would be better for cache locality and overall performance + // if we don't look this value up in the storage every time. Instead we + // could store it in a field of this struct and only update it's value + // once per raft loop iteration by calling a method update_configuration + // or something like that. + if let Some(storage) = &self.storage { + // FIXME: silently ignoring an error if the user specified a value + // of the wrong type. + if let Ok(Some(t)) = storage.properties.get(PropertyName::AutoOfflineTimeout) { + return Duration::from_secs_f64(t); + } + }; + Self::DEFAULT_AUTO_OFFLINE_TIMEOUT + } + + fn max_heartbeat_period(&self) -> Duration { + // TODO: it would be better for cache locality and overall performance + // if we don't look this value up in the storage every time. Instead we + // could store it in a field of this struct and only update it's value + // once per raft loop iteration by calling a method update_configuration + // or something like that. + if let Some(storage) = &self.storage { + // FIXME: silently ignoring an error if the user specified a value + // of the wrong type. + if let Ok(Some(t)) = storage.properties.get(PropertyName::MaxHeartbeatPeriod) { + return Duration::from_secs_f64(t); + } + }; + Self::DEFAULT_MAX_HEARTBEAT_PERIOD + } } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/src/storage.rs b/src/storage.rs index c6e79aef713afbf5427c162722c37f3973d84015..c4b5e58cbae739c2cecbab6e637071d332ad9e21 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -992,6 +992,14 @@ pub trait TClusterwideSpaceIndex { NextSchemaVersion = "next_schema_version", PasswordMinLength = "password_min_length", + + /// Number of seconds to wait before automatically changing an + /// unresponsive instance's grade to Offline. + AutoOfflineTimeout = "auto_offline_timeout", + + /// Maximum number of seconds to wait before sending another heartbeat + /// to an unresponsive instance. + MaxHeartbeatPeriod = "max_heartbeat_period", } } diff --git a/test/int/test_network_effects.py b/test/int/test_network_effects.py index 08152f40e25965b88197a3d0edb988c6148c7210..384821fa926e79b7b83c64caa3f03484842f5b8b 100644 --- a/test/int/test_network_effects.py +++ b/test/int/test_network_effects.py @@ -156,13 +156,15 @@ def get_instance_grades(peer: Instance, instance_id) -> tuple[str, str]: def test_instance_automatic_offline_detection(cluster: Cluster): i1, i2, i3 = cluster.deploy(instance_count=3) + index = cluster.cas("insert", "_pico_property", ["auto_offline_timeout", 0.5]) + cluster.raft_wait_index(index, 3) assert get_instance_grades(i1, i3.instance_id) == ("Online", "Online") i3.kill() - # Give the governor some time to detect the problem and act accordingly. - time.sleep(10) + # Give the sentinel some time to detect the problem and act accordingly. + time.sleep(2) assert get_instance_grades(i1, i3.instance_id) == ("Offline", "Offline")