From d37b35c708927e4b1f5847feeafc2362ca3d21b9 Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Wed, 20 Apr 2022 12:18:06 +0300
Subject: [PATCH] Refactor synchronous traft::Node calls

Use common `Notify` instead of a cond. It makes the code more clear and
reliable, because each request is tracked individually and can't be
affected by different yields in the main_loop.
---
 src/traft/node.rs | 72 +++++++++++++++++++++++++++--------------------
 1 file changed, 42 insertions(+), 30 deletions(-)

diff --git a/src/traft/node.rs b/src/traft/node.rs
index 3c8f01d2a9..b7eb03a8b8 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -1,3 +1,10 @@
+//! This module incapsulates most of the application-specific logics.
+//!
+//! It's responsible for
+//! - handling proposals,
+//! - handling configuration changes,
+//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
+
 use ::raft::prelude as raft;
 use ::raft::Error as RaftError;
 use ::tarantool::error::TransactionError;
@@ -48,40 +55,46 @@ pub struct Status {
     pub raft_state: String,
 }
 
+/// The heart of `traft` module - the Node.
 #[derive(Debug)]
 pub struct Node {
     _main_loop: fiber::UnitJoinHandle<'static>,
     _join_loop: fiber::UnitJoinHandle<'static>,
-    loop_cond: Rc<fiber::Cond>,
     main_inbox: Mailbox<NormalRequest>,
     join_inbox: Mailbox<(JoinRequest, Notify)>,
     status: Rc<RefCell<Status>>,
     status_cond: Rc<fiber::Cond>,
 }
 
+/// A request to the raft main loop.
 #[derive(Clone, Debug)]
 enum NormalRequest {
-    ProposeNormal {
-        op: traft::Op,
-        notify: Notify,
-    },
+    /// Propose `raft::prelude::Entry` of `EntryNormal` kind.
+    /// Make a notification when it's committed.
+    ProposeNormal { op: traft::Op, notify: Notify },
+    /// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
+    /// Make a notification when it's committed.
     ProposeConfChange {
         peers: Vec<traft::Peer>,
         notify: Notify,
     },
-    ReadIndex {
-        notify: Notify,
-    },
-    Campaign(),
+    /// Get a read barrier. In some systems it's also called the "quorum read".
+    /// Make a notification when index is read.
+    ReadIndex { notify: Notify },
+    /// Start a new raft term .
+    /// Make a notification when request is processed.
+    Campaign { notify: Notify },
+    /// Handle message from anoher raft node.
     Step(raft::Message),
-    Tick(u32),
+    /// Tick the node.
+    /// Make a notification when request is processed.
+    Tick { n_times: u32, notify: Notify },
 }
 
 impl Node {
     pub const TICK: Duration = Duration::from_millis(100);
 
     pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
-        let loop_cond = Rc::new(fiber::Cond::new());
         let status_cond = Rc::new(fiber::Cond::new());
         let main_inbox = Mailbox::<NormalRequest>::new();
         let join_inbox = Mailbox::<(JoinRequest, Notify)>::new();
@@ -93,11 +106,10 @@ impl Node {
         }));
 
         let main_loop_fn = {
-            let loop_cond = loop_cond.clone();
             let status = status.clone();
             let status_cond = status_cond.clone();
             let main_inbox = main_inbox.clone();
-            move || raft_main_loop(loop_cond, main_inbox, status, status_cond, raw_node)
+            move || raft_main_loop(main_inbox, status, status_cond, raw_node)
         };
 
         let join_loop_fn = {
@@ -107,7 +119,6 @@ impl Node {
         };
 
         let node = Node {
-            loop_cond: loop_cond.clone(),
             main_inbox,
             join_inbox,
             status,
@@ -117,7 +128,7 @@ impl Node {
         };
 
         // Wait for the node to enter the main loop
-        loop_cond.wait();
+        node.tick(0);
         Ok(node)
     }
 
@@ -160,9 +171,12 @@ impl Node {
     }
 
     pub fn campaign(&self) {
-        let req = NormalRequest::Campaign();
+        let (rx, tx) = fiber::Channel::new(1).into_clones();
+        let req = NormalRequest::Campaign { notify: tx };
         self.main_inbox.send(req);
-        self.loop_cond.wait();
+        if let Some(Err(e)) = rx.recv() {
+            tlog!(Error, "{e}");
+        }
     }
 
     pub fn step(&self, msg: raft::Message) {
@@ -171,9 +185,13 @@ impl Node {
     }
 
     pub fn tick(&self, n_times: u32) {
-        let req = NormalRequest::Tick(n_times);
+        let (rx, tx) = fiber::Channel::new(1).into_clones();
+        let req = NormalRequest::Tick {
+            n_times,
+            notify: tx,
+        };
         self.main_inbox.send(req);
-        self.loop_cond.wait();
+        rx.recv();
     }
 
     pub fn timeout_now(&self) {
@@ -196,7 +214,6 @@ impl Node {
 }
 
 fn raft_main_loop(
-    loop_cond: Rc<fiber::Cond>,
     main_inbox: Mailbox<NormalRequest>,
     status: Rc<RefCell<Status>>,
     status_cond: Rc<fiber::Cond>,
@@ -222,8 +239,6 @@ fn raft_main_loop(
         LogicalClock::new(id, gen)
     };
 
-    loop_cond.broadcast();
-
     loop {
         // Clean up obsolete notifications
         notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
@@ -288,20 +303,20 @@ fn raft_main_loop(
                     raw_node.read_index(ctx);
                     notifications.insert(lc.clone(), notify);
                 }
-                NormalRequest::Campaign() => {
-                    if let Err(e) = raw_node.campaign() {
-                        tlog!(Error, "{e}");
-                    }
+                NormalRequest::Campaign { notify } => {
+                    let res = raw_node.campaign().map(|_| 0);
+                    notify.try_send(res).expect("that's a bug");
                 }
                 NormalRequest::Step(msg) => {
                     if let Err(e) = raw_node.step(msg) {
                         tlog!(Error, "{e}");
                     }
                 }
-                NormalRequest::Tick(n_times) => {
+                NormalRequest::Tick { n_times, notify } => {
                     for _ in 0..n_times {
                         raw_node.tick();
                     }
+                    notify.try_send(Ok(0)).expect("that's a bug");
                 }
             }
         }
@@ -314,7 +329,6 @@ fn raft_main_loop(
 
         // Get the `Ready` with `RawNode::ready` interface.
         if !raw_node.has_ready() {
-            loop_cond.broadcast();
             continue;
         }
 
@@ -486,8 +500,6 @@ fn raft_main_loop(
             Ok(())
         })
         .unwrap();
-
-        loop_cond.broadcast();
     }
 }
 
-- 
GitLab