diff --git a/src/lib.rs b/src/lib.rs index adfccd652edb15b479c1012aa93f6b337f854cf9..a7e8fc0c0456fb088fc875c6e718d1850ed5227f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ pub mod r#loop; mod luamod; pub mod mailbox; pub mod on_shutdown; +pub mod reachability; pub mod replicaset; pub mod rpc; pub mod schema; diff --git a/src/reachability.rs b/src/reachability.rs new file mode 100644 index 0000000000000000000000000000000000000000..366ed585e7a549d3d26aff47fe9327e3fe4520ef --- /dev/null +++ b/src/reachability.rs @@ -0,0 +1,176 @@ +use crate::storage::Clusterwide; +use crate::traft::RaftId; +use std::cell::RefCell; +use std::collections::HashMap; +use std::collections::HashSet; +use std::rc::Rc; +use std::time::Duration; +use tarantool::fiber; +use tarantool::time::Instant; + +//////////////////////////////////////////////////////////////////////////////// +// InstanceReachabilityManager +//////////////////////////////////////////////////////////////////////////////// + +/// A struct holding information about reported attempts to communicate with +/// all known instances. +#[derive(Debug, Default)] +pub struct InstanceReachabilityManager { + // TODO: Will be used to read configuration from + #[allow(unused)] + storage: Option<Clusterwide>, + infos: HashMap<RaftId, InstanceReachabilityInfo>, +} + +pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager>>; + +impl InstanceReachabilityManager { + // TODO: make these configurable via _pico_property + const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5); + const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5); + + pub fn new(storage: Clusterwide) -> Self { + Self { + storage: Some(storage), + infos: Default::default(), + } + } + + /// Is called from a connection pool worker loop to report results of raft + /// messages sent to other instances. For example a timeout is considered + /// a failure. Updates info for the given instance. + pub fn report_result(&mut self, raft_id: RaftId, success: bool) { + let now = fiber::clock(); + let info = self.infos.entry(raft_id).or_insert_with(Default::default); + info.last_attempt = Some(now); + + // Even if it was previously reported as unreachable another message was + // sent to it, so raft node state may have changed and another + // report_unreachable me be needed. + info.is_reported = false; + + if success { + info.last_success = Some(now); + info.fail_streak = 0; + // If was previously reported as unreachable, it's now reachable so + // next time it should again be reported as unreachable. + } else { + info.fail_streak += 1; + } + } + + /// Is called at the beginning of the raft main loop to get information + /// about which instances should be reported as unreachable to the raft node. + pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> { + let mut res = Vec::with_capacity(16); + for (raft_id, info) in &mut self.infos { + if info.is_reported { + // Don't report nodes repeatedly. + continue; + } + if Self::determine_reachability(info) == Unreachable { + res.push(*raft_id); + info.is_reported = true; + } + } + res + } + + /// Is called by sentinel to get information about which instances should be + /// automatically assigned a different grade. + pub fn get_unreachables(&self) -> HashSet<RaftId> { + let mut res = HashSet::with_capacity(self.infos.len() / 3); + for (raft_id, info) in &self.infos { + if Self::determine_reachability(info) == Unreachable { + res.insert(*raft_id); + } + } + return res; + } + + /// Make a descision on the given instance's reachability based on the + /// provided `info`. This is an internal function. + fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState { + let Some(last_success) = info.last_success else { + // Don't make decisions about instances which didn't previously + // respond once so as to not interrup the process of booting up. + // TODO: report unreachable if fail_streak is big enough. + return Undecided; + }; + if info.fail_streak == 0 { + // Didn't fail once, so can't be unreachable. + return Reachable; + } + let now = fiber::clock(); + if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS { + Unreachable + } else { + Reachable + } + } + + /// Is called from raft main loop when handling raft messages, passing a + /// raft id of an instance which was previously determined to be unreachable. + /// This function makes a decision about how often raft hearbeat messages + /// should be sent to such instances. + pub fn should_send_heartbeat_this_tick(&self, to: RaftId) -> bool { + let Some(info) = self.infos.get(&to) else { + // No attempts were registered yet. + return true; + }; + + if info.fail_streak == 0 { + // Last attempt was successful, keep going. + return true; + } + + let Some(last_success) = info.last_success else { + // Didn't succeed once, keep trying. + return true; + }; + + let last_attempt = info + .last_attempt + .expect("this should be set if info was reported"); + + // Expontential decay. + // time: -----*---------*---------*-------------------*----------------> + // ^ ^ ^ ^ + // last_success attempt1 attempt2 attempt3 ... + // + // |<------->|<------->| + // D1 D1 + // |<----------------->|<----------------->| + // D2 D2 + // |<------------------------------------->| + // D3 + // ... + // DN == attemptN.duration_since(last_success) + // + let now = fiber::clock(); + let since_last_success = last_attempt.duration_since(last_success); + let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD); + if now > last_attempt + delay { + return true; + } + + return false; + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ReachabilityState { + Undecided, + Reachable, + Unreachable, +} +use ReachabilityState::*; + +/// Information about recent attempts to communicate with a single given instance. +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct InstanceReachabilityInfo { + pub last_success: Option<Instant>, + pub last_attempt: Option<Instant>, + pub fail_streak: u32, + pub is_reported: bool, +} diff --git a/src/sentinel.rs b/src/sentinel.rs index 5ef46dcc1d9fe1b4bcc5fb82f03e534a839f53e7..577871dc9b5d9a337e5cdfd9554a0f544b55a36f 100644 --- a/src/sentinel.rs +++ b/src/sentinel.rs @@ -1,12 +1,12 @@ use crate::has_grades; use crate::instance::grade::TargetGradeVariant; use crate::r#loop::FlowControl::{self, Break, Continue}; +use crate::reachability::InstanceReachabilityManagerRef; use crate::rpc; use crate::storage::Clusterwide; use crate::tlog; use crate::traft::error::Error; use crate::traft::network::ConnectionPool; -use crate::traft::network::InstanceReachabilityManagerRef; use crate::traft::{node, RaftSpaceAccess}; use ::tarantool::fiber; use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; diff --git a/src/traft/network.rs b/src/traft/network.rs index dceb03c591ab13be3e22fbf5ad8f886607f7ec96..785f9d20e39cb3cc17cc474b4105b73527f653c8 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -1,3 +1,14 @@ +use crate::instance::InstanceId; +use crate::mailbox::Mailbox; +use crate::reachability::InstanceReachabilityManagerRef; +use crate::rpc; +use crate::storage::{instance_field, Clusterwide, Instances, PeerAddresses}; +use crate::tlog; +use crate::traft; +use crate::traft::error::Error; +use crate::traft::RaftId; +use crate::traft::Result; +use crate::unwrap_ok_or; use ::raft::prelude as raft; use ::tarantool::fiber; use ::tarantool::fiber::r#async::oneshot; @@ -13,28 +24,13 @@ use ::tarantool::util::IntoClones; use futures::future::poll_fn; use futures::Future; use futures::FutureExt as _; -use std::cell::RefCell; use std::cell::UnsafeCell; use std::collections::HashMap; -use std::collections::HashSet; use std::collections::VecDeque; use std::pin::Pin; -use std::rc::Rc; use std::task::Poll; use std::time::Duration; use tarantool::fiber::r#async::timeout; -use tarantool::time::Instant; - -use crate::instance::InstanceId; -use crate::mailbox::Mailbox; -use crate::rpc; -use crate::storage::{instance_field, Clusterwide, Instances, PeerAddresses}; -use crate::tlog; -use crate::traft; -use crate::traft::error::Error; -use crate::traft::RaftId; -use crate::traft::Result; -use crate::unwrap_ok_or; pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(3); pub const DEFAULT_CUNCURRENT_FUTURES: usize = 10; @@ -393,173 +389,6 @@ impl std::fmt::Debug for PoolWorker { } } -//////////////////////////////////////////////////////////////////////////////// -// InstanceReachabilityManager -//////////////////////////////////////////////////////////////////////////////// - -/// A struct holding information about reported attempts to communicate with -/// all known instances. -#[derive(Debug, Default)] -pub struct InstanceReachabilityManager { - // TODO: Will be used to read configuration from - #[allow(unused)] - storage: Option<Clusterwide>, - infos: HashMap<RaftId, InstanceReachabilityInfo>, -} - -pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager>>; - -impl InstanceReachabilityManager { - // TODO: make these configurable via _pico_property - const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5); - const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5); - - pub fn new(storage: Clusterwide) -> Self { - Self { - storage: Some(storage), - infos: Default::default(), - } - } - - /// Is called from a connection pool worker loop to report results of raft - /// messages sent to other instances. For example a timeout is considered - /// a failure. Updates info for the given instance. - pub fn report_result(&mut self, raft_id: RaftId, success: bool) { - let now = fiber::clock(); - let info = self.infos.entry(raft_id).or_insert_with(Default::default); - info.last_attempt = Some(now); - - // Even if it was previously reported as unreachable another message was - // sent to it, so raft node state may have changed and another - // report_unreachable me be needed. - info.is_reported = false; - - if success { - info.last_success = Some(now); - info.fail_streak = 0; - // If was previously reported as unreachable, it's now reachable so - // next time it should again be reported as unreachable. - } else { - info.fail_streak += 1; - } - } - - /// Is called at the beginning of the raft main loop to get information - /// about which instances should be reported as unreachable to the raft node. - pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> { - let mut res = Vec::with_capacity(16); - for (raft_id, info) in &mut self.infos { - if info.is_reported { - // Don't report nodes repeatedly. - continue; - } - if Self::determine_reachability(info) == Unreachable { - res.push(*raft_id); - info.is_reported = true; - } - } - res - } - - /// Is called by sentinel to get information about which instances should be - /// automatically assigned a different grade. - pub fn get_unreachables(&self) -> HashSet<RaftId> { - let mut res = HashSet::with_capacity(self.infos.len() / 3); - for (raft_id, info) in &self.infos { - if Self::determine_reachability(info) == Unreachable { - res.insert(*raft_id); - } - } - return res; - } - - /// Make a descision on the given instance's reachability based on the - /// provided `info`. This is an internal function. - fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState { - let Some(last_success) = info.last_success else { - // Don't make decisions about instances which didn't previously - // respond once so as to not interrup the process of booting up. - // TODO: report unreachable if fail_streak is big enough. - return Undecided; - }; - if info.fail_streak == 0 { - // Didn't fail once, so can't be unreachable. - return Reachable; - } - let now = fiber::clock(); - if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS { - Unreachable - } else { - Reachable - } - } - - /// Is called from raft main loop when handling raft messages, passing a - /// raft id of an instance which was previously determined to be unreachable. - /// This function makes a decision about how often raft hearbeat messages - /// should be sent to such instances. - pub fn should_send_heartbeat_this_tick(&self, to: RaftId) -> bool { - let Some(info) = self.infos.get(&to) else { - // No attempts were registered yet. - return true; - }; - - if info.fail_streak == 0 { - // Last attempt was successful, keep going. - return true; - } - - let Some(last_success) = info.last_success else { - // Didn't succeed once, keep trying. - return true; - }; - - let last_attempt = info - .last_attempt - .expect("this should be set if info was reported"); - - // Expontential decay. - // time: -----*---------*---------*-------------------*----------------> - // ^ ^ ^ ^ - // last_success attempt1 attempt2 attempt3 ... - // - // |<------->|<------->| - // D1 D1 - // |<----------------->|<----------------->| - // D2 D2 - // |<------------------------------------->| - // D3 - // ... - // DN == attemptN.duration_since(last_success) - // - let now = fiber::clock(); - let since_last_success = last_attempt.duration_since(last_success); - let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD); - if now > last_attempt + delay { - return true; - } - - return false; - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum ReachabilityState { - Undecided, - Reachable, - Unreachable, -} -use ReachabilityState::*; - -/// Information about recent attempts to communicate with a single given instance. -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct InstanceReachabilityInfo { - pub last_success: Option<Instant>, - pub last_attempt: Option<Instant>, - pub fail_streak: u32, - pub is_reported: bool, -} - //////////////////////////////////////////////////////////////////////////////// // ConnectionPool //////////////////////////////////////////////////////////////////////////////// diff --git a/src/traft/node.rs b/src/traft/node.rs index 8ddad96eae3e02aa8bf1038b063bb96294d1541b..effa59e3a95c8b19106a3884a80da8bca3eb1b17 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -11,6 +11,7 @@ use crate::instance::Instance; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; +use crate::reachability::InstanceReachabilityManager; use crate::rpc; use crate::schema::{Distribution, IndexDef, SpaceDef}; use crate::sentinel; @@ -27,7 +28,6 @@ use crate::traft; use crate::traft::error::Error; use crate::traft::event; use crate::traft::event::Event; -use crate::traft::network::InstanceReachabilityManager; use crate::traft::network::WorkerOptions; use crate::traft::notify::{notification, Notifier, Notify}; use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult};