Skip to content
Snippets Groups Projects
Commit 39668ec1 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

feat: add auto_offline_timeout & max_heartbeat_period properties

parent 1daae4f6
No related branches found
No related tags found
1 merge request!624Feat/self-healing
use crate::storage::Clusterwide;
use crate::storage::PropertyName;
use crate::traft::RaftId;
use std::cell::RefCell;
use std::collections::HashMap;
......@@ -26,8 +27,8 @@ pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager
impl InstanceReachabilityManager {
// TODO: make these configurable via _pico_property
const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5);
const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5);
const DEFAULT_AUTO_OFFLINE_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5);
pub fn new(storage: Clusterwide) -> Self {
Self {
......@@ -62,13 +63,18 @@ impl InstanceReachabilityManager {
/// Is called at the beginning of the raft main loop to get information
/// about which instances should be reported as unreachable to the raft node.
pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> {
// This is how you turn off the borrow checker by the way.
let this = self as *const Self;
let mut res = Vec::with_capacity(16);
for (raft_id, info) in &mut self.infos {
if info.is_reported {
// Don't report nodes repeatedly.
continue;
}
if Self::determine_reachability(info) == Unreachable {
// SAFETY: this is safe because we're not accessing `self.infos` in
// this function.
if unsafe { &*this }.determine_reachability(info) == Unreachable {
res.push(*raft_id);
info.is_reported = true;
}
......@@ -81,7 +87,7 @@ impl InstanceReachabilityManager {
pub fn get_unreachables(&self) -> HashSet<RaftId> {
let mut res = HashSet::with_capacity(self.infos.len() / 3);
for (raft_id, info) in &self.infos {
if Self::determine_reachability(info) == Unreachable {
if self.determine_reachability(info) == Unreachable {
res.insert(*raft_id);
}
}
......@@ -90,7 +96,7 @@ impl InstanceReachabilityManager {
/// Make a descision on the given instance's reachability based on the
/// provided `info`. This is an internal function.
fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState {
fn determine_reachability(&self, info: &InstanceReachabilityInfo) -> ReachabilityState {
let Some(last_success) = info.last_success else {
// Don't make decisions about instances which didn't previously
// respond once so as to not interrup the process of booting up.
......@@ -102,7 +108,7 @@ impl InstanceReachabilityManager {
return Reachable;
}
let now = fiber::clock();
if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS {
if now.duration_since(last_success) > self.auto_offline_timeout() {
Unreachable
} else {
Reachable
......@@ -149,13 +155,45 @@ impl InstanceReachabilityManager {
//
let now = fiber::clock();
let since_last_success = last_attempt.duration_since(last_success);
let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD);
let delay = since_last_success.min(self.max_heartbeat_period());
if now > last_attempt + delay {
return true;
}
return false;
}
fn auto_offline_timeout(&self) -> Duration {
// TODO: it would be better for cache locality and overall performance
// if we don't look this value up in the storage every time. Instead we
// could store it in a field of this struct and only update it's value
// once per raft loop iteration by calling a method update_configuration
// or something like that.
if let Some(storage) = &self.storage {
// FIXME: silently ignoring an error if the user specified a value
// of the wrong type.
if let Ok(Some(t)) = storage.properties.get(PropertyName::AutoOfflineTimeout) {
return Duration::from_secs_f64(t);
}
};
Self::DEFAULT_AUTO_OFFLINE_TIMEOUT
}
fn max_heartbeat_period(&self) -> Duration {
// TODO: it would be better for cache locality and overall performance
// if we don't look this value up in the storage every time. Instead we
// could store it in a field of this struct and only update it's value
// once per raft loop iteration by calling a method update_configuration
// or something like that.
if let Some(storage) = &self.storage {
// FIXME: silently ignoring an error if the user specified a value
// of the wrong type.
if let Ok(Some(t)) = storage.properties.get(PropertyName::MaxHeartbeatPeriod) {
return Duration::from_secs_f64(t);
}
};
Self::DEFAULT_MAX_HEARTBEAT_PERIOD
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
......
......@@ -992,6 +992,14 @@ pub trait TClusterwideSpaceIndex {
NextSchemaVersion = "next_schema_version",
PasswordMinLength = "password_min_length",
/// Number of seconds to wait before automatically changing an
/// unresponsive instance's grade to Offline.
AutoOfflineTimeout = "auto_offline_timeout",
/// Maximum number of seconds to wait before sending another heartbeat
/// to an unresponsive instance.
MaxHeartbeatPeriod = "max_heartbeat_period",
}
}
......
......@@ -156,13 +156,15 @@ def get_instance_grades(peer: Instance, instance_id) -> tuple[str, str]:
def test_instance_automatic_offline_detection(cluster: Cluster):
i1, i2, i3 = cluster.deploy(instance_count=3)
index = cluster.cas("insert", "_pico_property", ["auto_offline_timeout", 0.5])
cluster.raft_wait_index(index, 3)
assert get_instance_grades(i1, i3.instance_id) == ("Online", "Online")
i3.kill()
# Give the governor some time to detect the problem and act accordingly.
time.sleep(10)
# Give the sentinel some time to detect the problem and act accordingly.
time.sleep(2)
assert get_instance_grades(i1, i3.instance_id) == ("Offline", "Offline")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment