From 8aafc1105348e46d9e87693cdf93c734e08f2b1b Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Mon, 19 Sep 2022 11:45:50 +0300
Subject: [PATCH] refactor: implement joint_state_latch using KVCell

---
 src/traft/node.rs | 77 +++++++++++++++--------------------------------
 1 file changed, 24 insertions(+), 53 deletions(-)

diff --git a/src/traft/node.rs b/src/traft/node.rs
index 582eef18dc..46c26b6335 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -16,7 +16,7 @@ use ::tarantool::fiber::{Cond, Mutex};
 use ::tarantool::proc;
 use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
-use std::cell::{Cell, RefCell};
+use std::cell::RefCell;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::convert::TryFrom;
@@ -33,6 +33,7 @@ use crate::traft::RaftId;
 use crate::traft::RaftIndex;
 use crate::traft::RaftTerm;
 use crate::unwrap_some_or;
+use crate::warn_or_panic;
 use ::tarantool::util::IntoClones as _;
 use protobuf::Message as _;
 use std::iter::FromIterator as _;
@@ -279,6 +280,7 @@ struct NodeImpl {
     pub raw_node: RawNode,
     pub notifications: HashMap<LogicalClock, Notify>,
     topology_cache: KVCell<RaftTerm, Topology>,
+    joint_state_latch: KVCell<RaftIndex, Notify>,
     storage: RaftSpaceAccess,
     pool: ConnectionPool,
     lc: LogicalClock,
@@ -320,6 +322,7 @@ impl NodeImpl {
             raw_node,
             notifications: Default::default(),
             topology_cache: KVCell::new(),
+            joint_state_latch: KVCell::new(),
             storage,
             pool,
             lc,
@@ -516,15 +519,14 @@ impl NodeImpl {
         let last_index = self.raw_node.raft.raft_log.last_index();
         assert_eq!(last_index, prev_index + 1);
 
+        if !self.joint_state_latch.is_empty() {
+            warn_or_panic!("joint state latch is locked");
+        }
+
         let (rx, tx) = Notify::new().into_clones();
-        with_joint_state_latch(|joint_state_latch| {
-            assert!(joint_state_latch.take().is_none());
-            event::broadcast(Event::JointStateEnter);
-            joint_state_latch.set(Some(JointStateLatch {
-                index: last_index,
-                notify: tx,
-            }));
-        });
+        self.joint_state_latch.insert(last_index, tx);
+        event::broadcast(Event::JointStateEnter);
+
         Ok(rx)
     }
 
@@ -591,33 +593,24 @@ impl NodeImpl {
             }
         }
 
-        with_joint_state_latch(|joint_state_latch| {
-            if let Some(latch) = joint_state_latch.take() {
-                if entry.index != latch.index {
-                    joint_state_latch.set(Some(latch));
-                    return;
-                }
-
-                // It was expected to be a ConfChange entry, but it's
-                // normal. Raft must have overriden it, or there was
-                // a re-election.
-                let e = RaftError::ConfChangeError("rolled back".into());
+        if let Some(notify) = self.joint_state_latch.take_or_keep(&entry.index) {
+            // It was expected to be a ConfChange entry, but it's
+            // normal. Raft must have overriden it, or there was
+            // a re-election.
+            let e = RaftError::ConfChangeError("rolled back".into());
 
-                latch.notify.notify_err(e);
-                event::broadcast(Event::JointStateDrop);
-            }
-        });
+            notify.notify_err(e);
+            event::broadcast(Event::JointStateDrop);
+        }
     }
 
     /// Is called during a transaction
     fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
-        let latch_unlock = || {
-            with_joint_state_latch(|joint_state_latch| {
-                if let Some(latch) = joint_state_latch.take() {
-                    latch.notify.notify_ok(());
-                    event::broadcast(Event::JointStateLeave);
-                }
-            });
+        let mut latch_unlock = || {
+            if let Some(notify) = self.joint_state_latch.take() {
+                notify.notify_ok(());
+                event::broadcast(Event::JointStateLeave);
+            }
         };
 
         // Beware: a tiny difference in type names (`V2` or not `V2`)
@@ -802,28 +795,6 @@ impl NodeImpl {
     }
 }
 
-#[derive(Debug)]
-struct JointStateLatch {
-    /// Index of the latest ConfChange entry proposed.
-    /// Helps detecting when the entry is overridden
-    /// due to a re-election.
-    index: RaftIndex,
-
-    /// Make a notification when the latch is unlocked.
-    /// Notification is a `Result<Box<()>>`.
-    notify: Notify,
-}
-
-fn with_joint_state_latch<F, R>(f: F) -> R
-where
-    F: FnOnce(&Cell<Option<JointStateLatch>>) -> R,
-{
-    thread_local! {
-        static JOINT_STATE_LATCH: Cell<Option<JointStateLatch>> = Cell::new(None);
-    }
-    JOINT_STATE_LATCH.with(f)
-}
-
 fn raft_main_loop(
     status: Rc<RefCell<Status>>,
     node_impl: Rc<Mutex<NodeImpl>>,
-- 
GitLab