Skip to content
Snippets Groups Projects
Commit 1daae4f6 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

refactor: extract src/ module

parent da6c515c
No related branches found
No related tags found
1 merge request!624Feat/self-healing
......@@ -39,6 +39,7 @@ pub mod r#loop;
mod luamod;
pub mod mailbox;
pub mod on_shutdown;
pub mod reachability;
pub mod replicaset;
pub mod rpc;
pub mod schema;
use crate::storage::Clusterwide;
use crate::traft::RaftId;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::rc::Rc;
use std::time::Duration;
use tarantool::fiber;
use tarantool::time::Instant;
// InstanceReachabilityManager
/// A struct holding information about reported attempts to communicate with
/// all known instances.
#[derive(Debug, Default)]
pub struct InstanceReachabilityManager {
// TODO: Will be used to read configuration from
storage: Option<Clusterwide>,
infos: HashMap<RaftId, InstanceReachabilityInfo>,
pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager>>;
impl InstanceReachabilityManager {
// TODO: make these configurable via _pico_property
const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5);
const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5);
pub fn new(storage: Clusterwide) -> Self {
Self {
storage: Some(storage),
infos: Default::default(),
/// Is called from a connection pool worker loop to report results of raft
/// messages sent to other instances. For example a timeout is considered
/// a failure. Updates info for the given instance.
pub fn report_result(&mut self, raft_id: RaftId, success: bool) {
let now = fiber::clock();
let info = self.infos.entry(raft_id).or_insert_with(Default::default);
info.last_attempt = Some(now);
// Even if it was previously reported as unreachable another message was
// sent to it, so raft node state may have changed and another
// report_unreachable me be needed.
info.is_reported = false;
if success {
info.last_success = Some(now);
info.fail_streak = 0;
// If was previously reported as unreachable, it's now reachable so
// next time it should again be reported as unreachable.
} else {
info.fail_streak += 1;
/// Is called at the beginning of the raft main loop to get information
/// about which instances should be reported as unreachable to the raft node.
pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> {
let mut res = Vec::with_capacity(16);
for (raft_id, info) in &mut self.infos {
if info.is_reported {
// Don't report nodes repeatedly.
if Self::determine_reachability(info) == Unreachable {
info.is_reported = true;
/// Is called by sentinel to get information about which instances should be
/// automatically assigned a different grade.
pub fn get_unreachables(&self) -> HashSet<RaftId> {
let mut res = HashSet::with_capacity(self.infos.len() / 3);
for (raft_id, info) in &self.infos {
if Self::determine_reachability(info) == Unreachable {
return res;
/// Make a descision on the given instance's reachability based on the
/// provided `info`. This is an internal function.
fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState {
let Some(last_success) = info.last_success else {
// Don't make decisions about instances which didn't previously
// respond once so as to not interrup the process of booting up.
// TODO: report unreachable if fail_streak is big enough.
return Undecided;
if info.fail_streak == 0 {
// Didn't fail once, so can't be unreachable.
return Reachable;
let now = fiber::clock();
if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS {
} else {
/// Is called from raft main loop when handling raft messages, passing a
/// raft id of an instance which was previously determined to be unreachable.
/// This function makes a decision about how often raft hearbeat messages
/// should be sent to such instances.
pub fn should_send_heartbeat_this_tick(&self, to: RaftId) -> bool {
let Some(info) = self.infos.get(&to) else {
// No attempts were registered yet.
return true;
if info.fail_streak == 0 {
// Last attempt was successful, keep going.
return true;
let Some(last_success) = info.last_success else {
// Didn't succeed once, keep trying.
return true;
let last_attempt = info
.expect("this should be set if info was reported");
// Expontential decay.
// time: -----*---------*---------*-------------------*---------------->
// ^ ^ ^ ^
// last_success attempt1 attempt2 attempt3 ...
// |<------->|<------->|
// D1 D1
// |<----------------->|<----------------->|
// D2 D2
// |<------------------------------------->|
// D3
// ...
// DN == attemptN.duration_since(last_success)
let now = fiber::clock();
let since_last_success = last_attempt.duration_since(last_success);
let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD);
if now > last_attempt + delay {
return true;
return false;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ReachabilityState {
use ReachabilityState::*;
/// Information about recent attempts to communicate with a single given instance.
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct InstanceReachabilityInfo {
pub last_success: Option<Instant>,
pub last_attempt: Option<Instant>,
pub fail_streak: u32,
pub is_reported: bool,
use crate::has_grades;
use crate::instance::grade::TargetGradeVariant;
use crate::r#loop::FlowControl::{self, Break, Continue};
use crate::reachability::InstanceReachabilityManagerRef;
use crate::rpc;
use crate::storage::Clusterwide;
use crate::tlog;
use crate::traft::error::Error;
use crate::traft::network::ConnectionPool;
use crate::traft::network::InstanceReachabilityManagerRef;
use crate::traft::{node, RaftSpaceAccess};
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use crate::instance::InstanceId;
use crate::mailbox::Mailbox;
use crate::reachability::InstanceReachabilityManagerRef;
use crate::rpc;
use crate::storage::{instance_field, Clusterwide, Instances, PeerAddresses};
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::RaftId;
use crate::traft::Result;
use crate::unwrap_ok_or;
use ::raft::prelude as raft;
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::oneshot;
......@@ -13,28 +24,13 @@ use ::tarantool::util::IntoClones;
use futures::future::poll_fn;
use futures::Future;
use futures::FutureExt as _;
use std::cell::RefCell;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Poll;
use std::time::Duration;
use tarantool::fiber::r#async::timeout;
use tarantool::time::Instant;
use crate::instance::InstanceId;
use crate::mailbox::Mailbox;
use crate::rpc;
use crate::storage::{instance_field, Clusterwide, Instances, PeerAddresses};
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::RaftId;
use crate::traft::Result;
use crate::unwrap_ok_or;
pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(3);
pub const DEFAULT_CUNCURRENT_FUTURES: usize = 10;
......@@ -393,173 +389,6 @@ impl std::fmt::Debug for PoolWorker {
// InstanceReachabilityManager
/// A struct holding information about reported attempts to communicate with
/// all known instances.
#[derive(Debug, Default)]
pub struct InstanceReachabilityManager {
// TODO: Will be used to read configuration from
storage: Option<Clusterwide>,
infos: HashMap<RaftId, InstanceReachabilityInfo>,
pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager>>;
impl InstanceReachabilityManager {
// TODO: make these configurable via _pico_property
const MAX_TIME_SINCE_SUCCESS: Duration = Duration::from_secs(5);
const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5);
pub fn new(storage: Clusterwide) -> Self {
Self {
storage: Some(storage),
infos: Default::default(),
/// Is called from a connection pool worker loop to report results of raft
/// messages sent to other instances. For example a timeout is considered
/// a failure. Updates info for the given instance.
pub fn report_result(&mut self, raft_id: RaftId, success: bool) {
let now = fiber::clock();
let info = self.infos.entry(raft_id).or_insert_with(Default::default);
info.last_attempt = Some(now);
// Even if it was previously reported as unreachable another message was
// sent to it, so raft node state may have changed and another
// report_unreachable me be needed.
info.is_reported = false;
if success {
info.last_success = Some(now);
info.fail_streak = 0;
// If was previously reported as unreachable, it's now reachable so
// next time it should again be reported as unreachable.
} else {
info.fail_streak += 1;
/// Is called at the beginning of the raft main loop to get information
/// about which instances should be reported as unreachable to the raft node.
pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> {
let mut res = Vec::with_capacity(16);
for (raft_id, info) in &mut self.infos {
if info.is_reported {
// Don't report nodes repeatedly.
if Self::determine_reachability(info) == Unreachable {
info.is_reported = true;
/// Is called by sentinel to get information about which instances should be
/// automatically assigned a different grade.
pub fn get_unreachables(&self) -> HashSet<RaftId> {
let mut res = HashSet::with_capacity(self.infos.len() / 3);
for (raft_id, info) in &self.infos {
if Self::determine_reachability(info) == Unreachable {
return res;
/// Make a descision on the given instance's reachability based on the
/// provided `info`. This is an internal function.
fn determine_reachability(info: &InstanceReachabilityInfo) -> ReachabilityState {
let Some(last_success) = info.last_success else {
// Don't make decisions about instances which didn't previously
// respond once so as to not interrup the process of booting up.
// TODO: report unreachable if fail_streak is big enough.
return Undecided;
if info.fail_streak == 0 {
// Didn't fail once, so can't be unreachable.
return Reachable;
let now = fiber::clock();
if now.duration_since(last_success) > Self::MAX_TIME_SINCE_SUCCESS {
} else {
/// Is called from raft main loop when handling raft messages, passing a
/// raft id of an instance which was previously determined to be unreachable.
/// This function makes a decision about how often raft hearbeat messages
/// should be sent to such instances.
pub fn should_send_heartbeat_this_tick(&self, to: RaftId) -> bool {
let Some(info) = self.infos.get(&to) else {
// No attempts were registered yet.
return true;
if info.fail_streak == 0 {
// Last attempt was successful, keep going.
return true;
let Some(last_success) = info.last_success else {
// Didn't succeed once, keep trying.
return true;
let last_attempt = info
.expect("this should be set if info was reported");
// Expontential decay.
// time: -----*---------*---------*-------------------*---------------->
// ^ ^ ^ ^
// last_success attempt1 attempt2 attempt3 ...
// |<------->|<------->|
// D1 D1
// |<----------------->|<----------------->|
// D2 D2
// |<------------------------------------->|
// D3
// ...
// DN == attemptN.duration_since(last_success)
let now = fiber::clock();
let since_last_success = last_attempt.duration_since(last_success);
let delay = since_last_success.min(Self::MAX_HEARTBEAT_PERIOD);
if now > last_attempt + delay {
return true;
return false;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ReachabilityState {
use ReachabilityState::*;
/// Information about recent attempts to communicate with a single given instance.
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct InstanceReachabilityInfo {
pub last_success: Option<Instant>,
pub last_attempt: Option<Instant>,
pub fail_streak: u32,
pub is_reported: bool,
// ConnectionPool
......@@ -11,6 +11,7 @@ use crate::instance::Instance;
use crate::kvcell::KVCell;
use crate::loop_start;
use crate::r#loop::FlowControl;
use crate::reachability::InstanceReachabilityManager;
use crate::rpc;
use crate::schema::{Distribution, IndexDef, SpaceDef};
use crate::sentinel;
......@@ -27,7 +28,6 @@ use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::network::InstanceReachabilityManager;
use crate::traft::network::WorkerOptions;
use crate::traft::notify::{notification, Notifier, Notify};
use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment