From 3279e77c92ff2f8f478c2abd80f4da0d300235c7 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 1 Aug 2023 10:16:33 +0300
Subject: [PATCH] fix: unreachable instances are no longer spammed with raft
 messages

---
 src/lib.rs           |   1 +
 src/traft/network.rs | 199 ++++++++++++++++++++++++++++++++++++++++---
 src/traft/node.rs    |  41 ++++++++-
 3 files changed, 225 insertions(+), 16 deletions(-)

diff --git a/src/lib.rs b/src/lib.rs
index 95b33208f2..3cc8b199d1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,4 @@
+#![allow(clippy::too_many_arguments)]
 #![allow(clippy::let_and_return)]
 #![allow(clippy::needless_return)]
 use serde::{Deserialize, Serialize};
diff --git a/src/traft/network.rs b/src/traft/network.rs
index 56ba6a2964..5a93d6db5d 100644
--- a/src/traft/network.rs
+++ b/src/traft/network.rs
@@ -1,23 +1,28 @@
 use ::raft::prelude as raft;
 use ::tarantool::fiber;
 use ::tarantool::fiber::r#async::oneshot;
+use ::tarantool::fiber::r#async::timeout::Error as TOError;
 use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
 use ::tarantool::fiber::r#async::watch;
 use ::tarantool::network;
 use ::tarantool::network::AsClient as _;
+use ::tarantool::network::Error as NetError;
 use ::tarantool::network::ReconnClient;
 use ::tarantool::tuple::{ToTupleBuffer, Tuple, TupleBuffer};
 use ::tarantool::util::IntoClones;
 use futures::future::poll_fn;
 use futures::Future;
 use futures::FutureExt as _;
+use std::cell::RefCell;
 use std::cell::UnsafeCell;
 use std::collections::HashMap;
 use std::collections::VecDeque;
 use std::pin::Pin;
+use std::rc::Rc;
 use std::task::Poll;
 use std::time::Duration;
 use tarantool::fiber::r#async::timeout;
+use tarantool::time::Instant;
 
 use crate::instance::InstanceId;
 use crate::mailbox::Mailbox;
@@ -58,12 +63,12 @@ struct Request {
     proc: &'static str,
     args: TupleBuffer,
     timeout: Option<Duration>,
-    on_result: Box<dyn FnOnce(Result<Tuple>)>,
+    on_result: OnRequestResult,
 }
 
 impl Request {
     #[inline(always)]
-    fn new<H>(proc: &'static str, args: TupleBuffer, on_result: H) -> Self
+    fn with_callback<H>(proc: &'static str, args: TupleBuffer, on_result: H) -> Self
     where
         H: FnOnce(Result<Tuple>) + 'static,
     {
@@ -71,9 +76,24 @@ impl Request {
             proc,
             args,
             timeout: None,
-            on_result: Box::new(on_result),
+            on_result: OnRequestResult::Callback(Box::new(on_result)),
         }
     }
+
+    #[inline(always)]
+    fn raft_msg(proc: &'static str, args: TupleBuffer) -> Self {
+        Self {
+            proc,
+            args,
+            timeout: None,
+            on_result: OnRequestResult::ReportUnreachable,
+        }
+    }
+}
+
+enum OnRequestResult {
+    Callback(Box<dyn FnOnce(Result<Tuple>)>),
+    ReportUnreachable,
 }
 
 type Queue = Mailbox<Request>;
@@ -105,6 +125,7 @@ impl PoolWorker {
         instance_id: impl Into<Option<InstanceId>>,
         storage: PeerAddresses,
         opts: WorkerOptions,
+        instance_reachability: InstanceReachabilityManagerRef,
     ) -> Result<PoolWorker> {
         let inbox = Mailbox::new();
         let (stop_sender, stop_receiver) = oneshot::channel();
@@ -130,12 +151,14 @@ impl PoolWorker {
                 async move {
                     futures::select! {
                         _ = Self::worker_loop(
+                                raft_id,
                                 inbox,
                                 inbox_ready_receiver,
                                 address,
                                 port,
                                 opts.call_timeout,
-                                opts.max_concurrent_futs
+                                opts.max_concurrent_futs,
+                                instance_reachability
                             ).fuse() => (),
                         _ = stop_receiver.fuse() => ()
                     }
@@ -156,12 +179,14 @@ impl PoolWorker {
     }
 
     async fn worker_loop(
+        raft_id: RaftId,
         inbox: Queue,
         mut inbox_ready: watch::Receiver<()>,
         address: String,
         port: u16,
         call_timeout: Duration,
         max_concurrent_fut: usize,
+        instance_reachability: InstanceReachabilityManagerRef,
     ) {
         let client = ReconnClient::new(address.clone(), port);
         let mut client_ver: usize = 0;
@@ -201,6 +226,8 @@ impl PoolWorker {
             poll_fn(|cx| {
                 let mut has_ready: bool = false;
                 let mut cursor = 0;
+                // NOTE: must not yield until this is dropped.
+                let mut reachability = instance_reachability.borrow_mut();
                 while cursor < futures.len() {
                     let poll_result = Future::poll(futures[cursor].2.as_mut(), cx);
                     if let Poll::Ready(result) = poll_result {
@@ -218,7 +245,26 @@ impl PoolWorker {
                                 None => highest_client_ver = Some(client_ver),
                             }
                         }
-                        on_result(result.map_err(Error::from));
+                        match on_result {
+                            OnRequestResult::Callback(cb) => {
+                                cb(result.map_err(Error::from));
+                            }
+                            OnRequestResult::ReportUnreachable => {
+                                let mut success = true;
+                                if let Err(e) = result {
+                                    tlog!(Warning, "error when sending message to peer: {e}";
+                                        "raft_id" => raft_id,
+                                    );
+                                    success = !matches!(
+                                        e,
+                                        TOError::Expired
+                                            | TOError::Failed(NetError::Tcp(_))
+                                            | TOError::Failed(NetError::Io(_))
+                                    );
+                                }
+                                reachability.report_result(raft_id, success);
+                            }
+                        }
                         has_ready = true;
                     } else {
                         cursor += 1;
@@ -247,14 +293,8 @@ impl PoolWorker {
         let raft_id = msg.to;
         let msg = traft::MessagePb::from(msg);
         let args = [msg].to_tuple_buffer()?;
-        let on_result = move |res| match res {
-            Ok(_) => (),
-            Err(e) => tlog!(Warning, "error when sending message to peer: {e}";
-                "raft_id" => raft_id,
-            ),
-        };
         self.inbox
-            .send(Request::new(self.raft_msg_handler, args, on_result));
+            .send(Request::raft_msg(self.raft_msg_handler, args));
         if self.inbox_ready.send(()).is_err() {
             tlog!(Warning, "failed sending request to peer, worker loop receiver dropped";
                 "raft_id" => raft_id,
@@ -326,7 +366,7 @@ impl PoolWorker {
             let ((res,),) = tuple.decode()?;
             Ok(res)
         };
-        let mut request = Request::new(proc, args, move |res| cb(convert_result(res)));
+        let mut request = Request::with_callback(proc, args, move |res| cb(convert_result(res)));
         request.timeout = timeout;
         self.inbox.send(request);
         if self.inbox_ready.send(()).is_err() {
@@ -352,6 +392,136 @@ impl std::fmt::Debug for PoolWorker {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// InstanceReachabilityManager
+////////////////////////////////////////////////////////////////////////////////
+
+/// A struct holding information about reported attempts to communicate with
+/// all known instances.
+#[derive(Debug, Default)]
+pub struct InstanceReachabilityManager {
+    // TODO: Will be used to read configuration from
+    #[allow(unused)]
+    storage: Option<Clusterwide>,
+    infos: HashMap<RaftId, InstanceReachabilityInfo>,
+}
+
+pub type InstanceReachabilityManagerRef = Rc<RefCell<InstanceReachabilityManager>>;
+
+impl InstanceReachabilityManager {
+    // TODO: make these configurable via _pico_property
+    const MAX_HEARTBEAT_PERIOD: Duration = Duration::from_secs(5);
+
+    pub fn new(storage: Clusterwide) -> Self {
+        Self {
+            storage: Some(storage),
+            infos: Default::default(),
+        }
+    }
+
+    /// Is called from a connection pool worker loop to report results of raft
+    /// messages sent to other instances. Updates info for the given instance.
+    pub fn report_result(&mut self, raft_id: RaftId, success: bool) {
+        let now = fiber::clock();
+        let info = self.infos.entry(raft_id).or_insert_with(Default::default);
+        info.last_attempt = Some(now);
+
+        // Even if it was previously reported as unreachable another message was
+        // sent to it, so raft node state may have changed and another
+        // report_unreachable me be needed.
+        info.is_reported = false;
+
+        if success {
+            info.last_success = Some(now);
+            info.fail_streak = 0;
+            // If was previously reported as unreachable, it's now reachable so
+            // next time it should again be reported as unreachable.
+        } else {
+            info.fail_streak += 1;
+        }
+    }
+
+    /// Is called at the beginning of the raft main loop to get information
+    /// about which instances should be reported as unreachable to the raft node.
+    pub fn take_unreachables_to_report(&mut self) -> Vec<RaftId> {
+        let mut res = Vec::with_capacity(16);
+        for (raft_id, info) in &mut self.infos {
+            if info.last_success.is_none() {
+                // Don't report nodes which didn't previously respond once,
+                // so that they can safely boot atleast.
+                continue;
+            }
+            if info.is_reported {
+                // Don't report nodes repeatedly.
+                continue;
+            }
+            // TODO: add configurable parameters, for example number of attempts
+            // before report.
+            if info.fail_streak > 0 {
+                res.push(*raft_id);
+                info.is_reported = true;
+            }
+        }
+        res
+    }
+
+    /// Is called from raft main loop when handling raft messages, passing a
+    /// raft id of an instance which was previously determined to be unreachable.
+    /// This function makes a decision about how often raft hearbeat messages
+    /// should be sent to such instances.
+    pub fn should_send_heartbeat_this_tick(&self, to: RaftId) -> bool {
+        let Some(info) = self.infos.get(&to) else {
+            // No attempts were registered yet.
+            return true;
+        };
+
+        if info.fail_streak == 0 {
+            // Last attempt was successful, keep going.
+            return true;
+        }
+
+        let Some(last_success) = info.last_success else {
+            // Didn't succeed once, keep trying.
+            return true;
+        };
+
+        let last_attempt = info
+            .last_attempt
+            .expect("this should be set if info was reported");
+
+        // Expontential decay.
+        // time: -----*---------*---------*-------------------*---------------->
+        //            ^         ^         ^                   ^
+        // last_success         attempt1  attempt2            attempt3   ...
+        //
+        //            |<------->|<------->|
+        //                D1         D1
+        //            |<----------------->|<----------------->|
+        //                     D2                   D2
+        //            |<------------------------------------->|
+        //                                D3
+        //                                ...
+        // DN == attemptN.duration_since(last_success)
+        //
+        let now = fiber::clock();
+        let wait_timeout = last_attempt.duration_since(last_success).min(Self::MAX_HEARTBEAT_PERIOD);
+        if now > last_attempt + wait_timeout {
+            return true;
+        }
+
+        return false;
+    }
+}
+
+/// Information about recent attempts to communicate with a single given instance.
+#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct InstanceReachabilityInfo {
+    pub last_success: Option<Instant>,
+    pub last_attempt: Option<Instant>,
+    pub fail_streak: u32,
+    pub is_reported: bool,
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ConnectionPool
 ////////////////////////////////////////////////////////////////////////////////
@@ -364,6 +534,7 @@ pub struct ConnectionPool {
     raft_ids: UnsafeCell<HashMap<InstanceId, RaftId>>,
     peer_addresses: PeerAddresses,
     instances: Instances,
+    pub(crate) instance_reachability: InstanceReachabilityManagerRef,
 }
 
 impl ConnectionPool {
@@ -375,6 +546,7 @@ impl ConnectionPool {
             raft_ids: Default::default(),
             peer_addresses: storage.peer_addresses,
             instances: storage.instances,
+            instance_reachability: Default::default(),
         }
     }
 
@@ -405,6 +577,7 @@ impl ConnectionPool {
                 instance_id.clone(),
                 self.peer_addresses.clone(),
                 self.worker_options.clone(),
+                self.instance_reachability.clone(),
             )?;
             if let Some(instance_id) = instance_id {
                 let raft_ids = unsafe { &mut *self.raft_ids.get() };
diff --git a/src/traft/node.rs b/src/traft/node.rs
index c1577da674..b1b09c9f5f 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -26,6 +26,8 @@ use crate::traft;
 use crate::traft::error::Error;
 use crate::traft::event;
 use crate::traft::event::Event;
+use crate::traft::network::InstanceReachabilityManager;
+use crate::traft::network::WorkerOptions;
 use crate::traft::notify::{notification, Notifier, Notify};
 use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult};
 use crate::traft::ConnectionPool;
@@ -69,8 +71,6 @@ use std::rc::Rc;
 use std::time::Duration;
 use ApplyEntryResult::*;
 
-use super::network::WorkerOptions;
-
 type RawNode = raft::RawNode<RaftSpaceAccess>;
 
 ::tarantool::define_str_enum! {
@@ -167,7 +167,12 @@ impl Node {
             call_timeout: MainLoop::TICK.saturating_mul(4),
             ..Default::default()
         };
-        let pool = Rc::new(ConnectionPool::new(storage.clone(), opts));
+        let mut pool = ConnectionPool::new(storage.clone(), opts);
+        let instance_reachability = Rc::new(RefCell::new(InstanceReachabilityManager::new(
+            storage.clone(),
+        )));
+        pool.instance_reachability = instance_reachability;
+        let pool = Rc::new(pool);
 
         let node_impl = NodeImpl::new(pool.clone(), storage.clone(), raft_storage.clone())?;
 
@@ -393,6 +398,7 @@ pub(crate) struct NodeImpl {
     pool: Rc<ConnectionPool>,
     lc: LogicalClock,
     status: watch::Sender<Status>,
+    instance_reachability: Rc<RefCell<InstanceReachabilityManager>>,
 }
 
 impl NodeImpl {
@@ -436,6 +442,7 @@ impl NodeImpl {
             joint_state_latch: KVCell::new(),
             storage,
             raft_storage,
+            instance_reachability: pool.instance_reachability.clone(),
             pool,
             lc,
             status,
@@ -1161,7 +1168,13 @@ impl NodeImpl {
 
     /// Is called during a transaction
     fn handle_messages(&mut self, messages: Vec<raft::Message>) {
+        let instance_reachability = self.instance_reachability.borrow();
         for msg in messages {
+            if msg.msg_type == raft::MessageType::MsgHeartbeat
+                && !instance_reachability.should_send_heartbeat_this_tick(msg.to)
+            {
+                continue;
+            }
             if let Err(e) = self.pool.send(msg) {
                 tlog!(Error, "{e}");
             }
@@ -1189,6 +1202,28 @@ impl NodeImpl {
         expelled: &mut bool,
         storage_changes: &mut StorageChanges,
     ) {
+        // Handle any unreachable nodes from previous iteration.
+        let unreachables = self
+            .instance_reachability
+            .borrow_mut()
+            .take_unreachables_to_report();
+        for raft_id in unreachables {
+            self.raw_node.report_unreachable(raft_id);
+
+            // TODO: remove infos when instances are expelled.
+            let Some(pr) = self.raw_node.raft.mut_prs().get_mut(raft_id) else { continue; };
+            // NOTE: Raft-rs will not check if the node should be paused until
+            // a new raft entry is appended to the log. This means that once an
+            // instance goes silent it will still be bombarded with heartbeats
+            // until someone proposes an operation. This is a workaround for
+            // that particular case.
+            // The istance's state would've only changed if it was not in the
+            // Snapshot state, so we have to check for that.
+            if pr.state == ::raft::ProgressState::Probe {
+                pr.pause();
+            }
+        }
+
         // Get the `Ready` with `RawNode::ready` interface.
         if !self.raw_node.has_ready() {
             return;
-- 
GitLab